Skip to content

Commit

Permalink
[UT] Add tests
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy committed Aug 29, 2023
1 parent de439af commit 8c0d864
Show file tree
Hide file tree
Showing 8 changed files with 976 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,34 @@ public ExactlyOnceLabelGeneratorSnapshot snapshot(long checkpointId) {
checkpointId, db, table, labelPrefix, numberOfSubtasks, subTaskIndex, nextId.get());
}

public String getLabelPrefix() {
return labelPrefix;
}

public String getDb() {
return db;
}

public String getTable() {
return table;
}

public int getNumberOfSubtasks() {
return numberOfSubtasks;
}

public int getSubTaskIndex() {
return subTaskIndex;
}

public long getNextId() {
return nextId.get();
}

public ExactlyOnceLabelGenerator.LabelDbTableSubtask createLabelDbTableSubtask() {
return new ExactlyOnceLabelGenerator.LabelDbTableSubtask(labelPrefix, db, table, subTaskIndex);
}

public static String genLabel(String labelPrefix, String table, int subTaskIndex, long id) {
return String.format("%s-%s-%s-%s", labelPrefix, table, subTaskIndex, id);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

package com.starrocks.connector.flink.table.sink;

import com.starrocks.data.load.stream.LabelGenerator;
import com.starrocks.data.load.stream.LabelGeneratorFactory;
import org.apache.arrow.util.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -56,7 +56,7 @@ public ExactlyOnceLabelGeneratorFactory(String labelPrefix, int numberOfSubtasks
}

@Override
public synchronized LabelGenerator create(String db, String table) {
public synchronized ExactlyOnceLabelGenerator create(String db, String table) {
Map<String, ExactlyOnceLabelGenerator> tableMap = labelGenerators.computeIfAbsent(db, key -> new HashMap<>());
ExactlyOnceLabelGenerator generator = tableMap.get(table);
if (generator == null) {
Expand All @@ -83,7 +83,7 @@ public synchronized List<ExactlyOnceLabelGeneratorSnapshot> snapshot(long checkp
public synchronized void restore(List<ExactlyOnceLabelGeneratorSnapshot> snapshots) {
Map<ExactlyOnceLabelGenerator.LabelDbTableSubtask, ExactlyOnceLabelGeneratorSnapshot> map = new HashMap<>();
for (ExactlyOnceLabelGeneratorSnapshot snapshot : snapshots) {
if (snapshot.getSubtaskIndex() != subtaskIndex && !snapshot.getLabelPrefix().equals(labelPrefix)) {
if (snapshot.getSubTaskIndex() != subtaskIndex || !snapshot.getLabelPrefix().equals(labelPrefix)) {
LOG.info("Skip snapshot: {}", snapshot);
continue;
}
Expand All @@ -93,6 +93,9 @@ public synchronized void restore(List<ExactlyOnceLabelGeneratorSnapshot> snapsho
// Sanity check that there should not have duplicated snapshot for a LabelDbTableSubtask
if (oldSnapshot != null) {
LOG.warn("Find duplicate snapshot, old snapshot: {}, new snapshot: {}", oldSnapshot, snapshot);
if (snapshot.getNextId() < oldSnapshot.getNextId()) {
continue;
}
}
map.put(meta, snapshot);
}
Expand All @@ -105,4 +108,11 @@ public synchronized void restore(List<ExactlyOnceLabelGeneratorSnapshot> snapsho
LOG.info("Restore snapshot: {}, generator: {}", snapshot, generator);
}
}

@VisibleForTesting
public long numGenerators() {
return labelGenerators.values().stream()
.mapToInt(m -> m.values().size())
.sum();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

package com.starrocks.connector.flink.table.sink;

import com.google.common.base.Objects;

/** Snapshot for the label generator. */
public class ExactlyOnceLabelGeneratorSnapshot {

Expand All @@ -34,18 +36,18 @@ public class ExactlyOnceLabelGeneratorSnapshot {
/** The number of subtasks (the parallelism of the sink) when this snapshot is created. */
private final int numberOfSubtasks;
/** The index of the subtask that creates this snapshot. */
private final int subtaskIndex;
private final int subTaskIndex;
/** Next id used to create label. */
private final long nextId;

public ExactlyOnceLabelGeneratorSnapshot(long checkpointId, String db, String table,
String labelPrefix, int numberOfSubtasks, int subtaskIndex, long nextId) {
String labelPrefix, int numberOfSubtasks, int subTaskIndex, long nextId) {
this.checkpointId = checkpointId;
this.db = db;
this.table = table;
this.labelPrefix = labelPrefix;
this.numberOfSubtasks = numberOfSubtasks;
this.subtaskIndex = subtaskIndex;
this.subTaskIndex = subTaskIndex;
this.nextId = nextId;
}

Expand All @@ -69,16 +71,46 @@ public int getNumberOfSubtasks() {
return numberOfSubtasks;
}

public int getSubtaskIndex() {
return subtaskIndex;
public int getSubTaskIndex() {
return subTaskIndex;
}

public long getNextId() {
return nextId;
}

public ExactlyOnceLabelGenerator.LabelDbTableSubtask createLabelDbTableSubtask() {
return new ExactlyOnceLabelGenerator.LabelDbTableSubtask(labelPrefix, db, table, subtaskIndex);
return new ExactlyOnceLabelGenerator.LabelDbTableSubtask(labelPrefix, db, table, subTaskIndex);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ExactlyOnceLabelGeneratorSnapshot snapshot = (ExactlyOnceLabelGeneratorSnapshot) o;
return checkpointId == snapshot.checkpointId
&& numberOfSubtasks == snapshot.numberOfSubtasks
&& subTaskIndex == snapshot.subTaskIndex
&& nextId == snapshot.nextId
&& Objects.equal(db, snapshot.db)
&& Objects.equal(table, snapshot.table)
&& Objects.equal(labelPrefix, snapshot.labelPrefix);
}

@Override
public int hashCode() {
return Objects.hashCode(
checkpointId,
db,
table,
labelPrefix,
numberOfSubtasks,
subTaskIndex,
nextId);
}

@Override
Expand All @@ -89,7 +121,7 @@ public String toString() {
", table='" + table + '\'' +
", labelPrefix='" + labelPrefix + '\'' +
", numberOfSubtasks=" + numberOfSubtasks +
", subtaskIndex=" + subtaskIndex +
", subtaskIndex=" + subTaskIndex +
", nextId=" + nextId +
'}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -68,7 +68,7 @@ public LingeringTransactionAborter(
}

public void execute() throws Exception {
Map<ExactlyOnceLabelGenerator.LabelDbTableSubtask, ExactlyOnceLabelGeneratorSnapshot> map = new HashMap<>();
Map<ExactlyOnceLabelGenerator.LabelDbTableSubtask, ExactlyOnceLabelGeneratorSnapshot> map = new LinkedHashMap<>();
Set<String> oldLabelPrefixes = new HashSet<>();
for (ExactlyOnceLabelGeneratorSnapshot snapshot : snapshots) {
// sanity check that the checkpoint id of the snapshot should be the same with the restoredCheckpointId
Expand Down Expand Up @@ -109,7 +109,7 @@ private void abortSnapshotLabelPrefix(List<ExactlyOnceLabelGeneratorSnapshot> sn
long endId = checkNumTxns < 0 ? Long.MAX_VALUE : snapshot.getNextId() + checkNumTxns;
for (long id = snapshot.getNextId(); id < endId; id++) {
String label = ExactlyOnceLabelGenerator.genLabel(
snapshot.getLabelPrefix(), snapshot.getTable(), snapshot.getSubtaskIndex(), id);
snapshot.getLabelPrefix(), snapshot.getTable(), snapshot.getSubTaskIndex(), id);
try {
boolean result = tryAbortTransaction(snapshot.getDb(), snapshot.getTable(), label);
// if checkNumTxns < 0, end up after finding the first transaction that does not need abort
Expand All @@ -132,9 +132,9 @@ private void abortCurrentLabelPrefix() throws Exception {
//TODO considering rescale
// if checkNumTxns is negative, execute until find the first txn that does not need abort
// The start checkpoint
long endId = checkNumTxns < 0 ? Long.MAX_VALUE : restoredCheckpointId + checkNumTxns;
long endId = checkNumTxns < 0 ? Long.MAX_VALUE : restoredCheckpointId + 1 + checkNumTxns;
for (long id = restoredCheckpointId + 1; id < endId; id++) {
String label = ExactlyOnceLabelGenerator.genLabel(dbTable.f0, dbTable.f1, subtaskIndex, id);
String label = ExactlyOnceLabelGenerator.genLabel(currentLabelPrefix, dbTable.f1, subtaskIndex, id);
try {
boolean result = tryAbortTransaction(dbTable.f0, dbTable.f1, label);
// if checkNumTxns < 0, end up after finding the first transaction that does not need abort
Expand Down Expand Up @@ -215,7 +215,7 @@ private boolean tryAbortTransaction(String db, String table, String label) throw
"db: {}, table: {}, label: {}", db, table, label, ie);
}

if (newStatus == null || newStatus == TransactionStatus.PREPARE || newStatus == TransactionStatus.PREPARED) {
if (newStatus != TransactionStatus.UNKNOWN && newStatus != TransactionStatus.ABORTED) {
String errMsg = String.format("Fail to abort lingering transaction, db: %s, table: %s, " +
"label: %s, status: %s", db, table, label, newStatus);
LOG.error(errMsg, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,20 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import java.util.function.Function;

import static com.starrocks.connector.flink.it.sink.StarRocksTableUtils.scanTable;
import static com.starrocks.connector.flink.it.sink.StarRocksTableUtils.verifyResult;
import static org.junit.Assume.assumeFalse;
import static org.junit.Assume.assumeTrue;

@RunWith(Parameterized.class)
public class StarRocksSinkITTest extends StarRocksSinkITTestBase {

@Parameterized.Parameters(name = "isSinkV2={0}")
@Parameterized.Parameters(name = "sinkV2={0}")
public static List<Object> parameters() {
return Arrays.asList(false, true);
}
Expand Down Expand Up @@ -91,7 +92,7 @@ public void testDupKeyWriteFullColumnsOutOfOrder() throws Exception {

@Test
public void testDupKeyWritePartialColumnsInOrder() throws Exception {
assumeFalse(isSinkV2);
assumeTrue(isSinkV2);
String ddl = "c0 INT, c2 STRING";
List<Row> testData = new ArrayList<>();
testData.add(Row.of(1, "abc"));
Expand All @@ -108,7 +109,7 @@ public void testDupKeyWritePartialColumnsInOrder() throws Exception {

@Test
public void testDupKeyWritePartialColumnsOutOfOrder() throws Exception {
assumeFalse(isSinkV2);
assumeTrue(isSinkV2);
String ddl = "c2 STRING, c0 INT";
List<Row> testData = new ArrayList<>();
testData.add(Row.of("abc", 1));
Expand Down Expand Up @@ -144,7 +145,7 @@ private void testDupKeyWriteBase(String flinkDDL, RowTypeInfo rowTypeInfo,
"'connector' = 'starrocks'," +
"'jdbc-url'='" + sinkOptions.getJdbcUrl() + "'," +
"'load-url'='" + String.join(";", sinkOptions.getLoadUrlList()) + "'," +
"'sink.version' = '" + (isSinkV2 ? "V1" : "V2") + "'," +
"'sink.version' = '" + (isSinkV2 ? "V2" : "V1") + "'," +
"'database-name' = '" + DB_NAME + "'," +
"'table-name' = '" + sinkOptions.getTableName() + "'," +
"'username' = '" + sinkOptions.getUsername() + "'," +
Expand Down Expand Up @@ -212,7 +213,7 @@ public void testPkWriteFullColumnsOutOfOrder() throws Exception {

@Test
public void testPkWritePartialColumnsInOrder() throws Exception {
assumeFalse(isSinkV2);
assumeTrue(isSinkV2);
String ddl = "c0 INT, c2 STRING";
List<Row> testData = new ArrayList<>();
testData.add(Row.of(1, "abc"));
Expand All @@ -229,7 +230,7 @@ public void testPkWritePartialColumnsInOrder() throws Exception {

@Test
public void testPkWritePartialColumnsOutOfOrder() throws Exception {
assumeFalse(isSinkV2);
assumeTrue(isSinkV2);
String ddl = "c2 STRING, c0 INT";
List<Row> testData = new ArrayList<>();
testData.add(Row.of("abc", 1));
Expand Down Expand Up @@ -266,7 +267,7 @@ private void testPkWriteForBase(String flinkDDL, RowTypeInfo rowTypeInfo,
"'connector' = 'starrocks'," +
"'jdbc-url'='" + sinkOptions.getJdbcUrl() + "'," +
"'load-url'='" + String.join(";", sinkOptions.getLoadUrlList()) + "'," +
"'sink.version' = '" + (isSinkV2 ? "V1" : "V2") + "'," +
"'sink.version' = '" + (isSinkV2 ? "V2" : "V1") + "'," +
"'database-name' = '" + DB_NAME + "'," +
"'table-name' = '" + sinkOptions.getTableName() + "'," +
"'username' = '" + sinkOptions.getUsername() + "'," +
Expand Down Expand Up @@ -316,7 +317,7 @@ public void testPKUpsertAndDelete() throws Exception {
"'connector' = 'starrocks'," +
"'jdbc-url'='" + sinkOptions.getJdbcUrl() + "'," +
"'load-url'='" + String.join(";", sinkOptions.getLoadUrlList()) + "'," +
"'sink.version' = '" + (isSinkV2 ? "V1" : "V2") + "'," +
"'sink.version' = '" + (isSinkV2 ? "V2" : "V1") + "'," +
"'database-name' = '" + DB_NAME + "'," +
"'table-name' = '" + sinkOptions.getTableName() + "'," +
"'username' = '" + sinkOptions.getUsername() + "'," +
Expand All @@ -335,7 +336,7 @@ public void testPKUpsertAndDelete() throws Exception {

@Test
public void testPKPartialUpdateDelete() throws Exception {
assumeFalse(isSinkV2);
assumeTrue(isSinkV2);
String tableName = createPkTable("testPKPartialUpdateDelete");
executeSrSQL(String.format("INSERT INTO `%s`.`%s` VALUES (1, 1.0, '1'), (2, 2.0, '2')", DB_NAME, tableName));
verifyResult(Arrays.asList(Arrays.asList(1, 1.0f, "1"), Arrays.asList(2, 2.0f, "2")),
Expand Down Expand Up @@ -369,7 +370,7 @@ public void testPKPartialUpdateDelete() throws Exception {
"'connector' = 'starrocks'," +
"'jdbc-url'='" + sinkOptions.getJdbcUrl() + "'," +
"'load-url'='" + String.join(";", sinkOptions.getLoadUrlList()) + "'," +
"'sink.version' = '" + (isSinkV2 ? "V1" : "V2") + "'," +
"'sink.version' = '" + (isSinkV2 ? "V2" : "V1") + "'," +
"'database-name' = '" + DB_NAME + "'," +
"'table-name' = '" + sinkOptions.getTableName() + "'," +
"'username' = '" + sinkOptions.getUsername() + "'," +
Expand Down Expand Up @@ -400,9 +401,15 @@ public void testAtLeastOnceWithoutTransaction() throws Exception {

@Test
public void testJsonFormat() throws Exception {
assumeFalse(isSinkV2);
testConfigurationBase(
Collections.singletonMap("sink.properties.format", "json"), env -> null);
if (isSinkV2) {
testConfigurationBase(
Collections.singletonMap("sink.properties.format", "json"), env -> null);
} else {
Map<String, String> map = new HashMap<>();
map.put("sink.properties.format", "json");
map.put("sink.properties.strip_outer_array", "true");
testConfigurationBase(map, env -> null);
}
}

private void testConfigurationBase(Map<String, String> options, Function<StreamExecutionEnvironment, Void> setFlinkEnv) throws Exception {
Expand All @@ -428,7 +435,7 @@ private void testConfigurationBase(Map<String, String> options, Function<StreamE
"'connector' = 'starrocks'," +
"'jdbc-url'='" + getJdbcUrl() + "'," +
"'load-url'='" + String.join(";", getHttpUrls()) + "'," +
"'sink.version' = '" + (isSinkV2 ? "V1" : "V2") + "'," +
"'sink.version' = '" + (isSinkV2 ? "V2" : "V1") + "'," +
"'database-name' = '" + DB_NAME + "'," +
"'table-name' = '" + tableName + "'," +
"'username' = 'root'," +
Expand Down
Loading

0 comments on commit 8c0d864

Please sign in to comment.