Skip to content

Commit

Permalink
[Bugfix] Fix SINK_AT_LEAST_ONCE_USE_TRANSACTION_LOAD does not take ef…
Browse files Browse the repository at this point in the history
…fect

Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy committed Aug 1, 2023
1 parent e1dd01c commit 3e21441
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
Expand All @@ -35,7 +38,11 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import java.util.function.Function;

import static com.starrocks.connector.flink.it.sink.StarRocksTableUtils.scanTable;
import static com.starrocks.connector.flink.it.sink.StarRocksTableUtils.verifyResult;
Expand All @@ -51,13 +58,11 @@ public void testDupKeyWriteFullColumnsInOrder() throws Exception {
RowTypeInfo rowTypeInfo = new RowTypeInfo(
new TypeInformation[]{Types.INT, Types.FLOAT, Types.STRING},
new String[]{"c0", "c1", "c2"});
String tableName = "testDupKeyWriteFullColumnsInOrder_" + genRandomUuid();

List<List<Object>> expectedData = Arrays.asList(
Arrays.asList(1, 10.1f, "abc"),
Arrays.asList(2, 20.2f, "def")
);
testDupKeyWriteBase(tableName, ddl, rowTypeInfo, testData, expectedData);
testDupKeyWriteBase(ddl, rowTypeInfo, testData, expectedData);
}

@Test
Expand All @@ -69,13 +74,11 @@ public void testDupKeyWriteFullColumnsOutOfOrder() throws Exception {
RowTypeInfo rowTypeInfo = new RowTypeInfo(
new TypeInformation[]{Types.STRING, Types.FLOAT, Types.INT},
new String[]{"c2", "c1", "c0"});
String tableName = "testDupKeyWriteFullColumnsOutOfOrder_" + genRandomUuid();

List<List<Object>> expectedData = Arrays.asList(
Arrays.asList(1, 10.1f, "abc"),
Arrays.asList(2, 20.2f, "def")
);
testDupKeyWriteBase(tableName, ddl, rowTypeInfo, testData, expectedData);
testDupKeyWriteBase(ddl, rowTypeInfo, testData, expectedData);
}

@Test
Expand All @@ -87,13 +90,11 @@ public void testDupKeyWritePartialColumnsInOrder() throws Exception {
RowTypeInfo rowTypeInfo = new RowTypeInfo(
new TypeInformation[]{Types.INT, Types.STRING},
new String[]{"c0", "c2"});
String tableName = "testDupKeyWritePartialColumnsInOrder_" + genRandomUuid();

List<List<Object>> expectedData = Arrays.asList(
Arrays.asList(1, null, "abc"),
Arrays.asList(2, null, "def")
);
testDupKeyWriteBase(tableName, ddl, rowTypeInfo, testData, expectedData);
testDupKeyWriteBase(ddl, rowTypeInfo, testData, expectedData);
}

@Test
Expand All @@ -105,32 +106,16 @@ public void testDupKeyWritePartialColumnsOutOfOrder() throws Exception {
RowTypeInfo rowTypeInfo = new RowTypeInfo(
new TypeInformation[]{Types.STRING, Types.INT},
new String[]{"c2", "c0"});
String tableName = "testDupKeyWritePartialColumnsOutOfOrder_" + genRandomUuid();

List<List<Object>> expectedData = Arrays.asList(
Arrays.asList(1, null, "abc"),
Arrays.asList(2, null, "def")
);
testDupKeyWriteBase(tableName, ddl, rowTypeInfo, testData, expectedData);
testDupKeyWriteBase(ddl, rowTypeInfo, testData, expectedData);
}

private void testDupKeyWriteBase(String tableName, String flinkDDL, RowTypeInfo rowTypeInfo,
private void testDupKeyWriteBase(String flinkDDL, RowTypeInfo rowTypeInfo,
List<Row> testData, List<List<Object>> expectedData) throws Exception {
String createStarRocksTable =
String.format(
"CREATE TABLE `%s`.`%s` (" +
"c0 INT," +
"c1 FLOAT," +
"c2 STRING" +
") ENGINE = OLAP " +
"DUPLICATE KEY(c0) " +
"DISTRIBUTED BY HASH (c0) BUCKETS 8 " +
"PROPERTIES (" +
"\"replication_num\" = \"1\"" +
")",
DB_NAME, tableName);
executeSrSQL(createStarRocksTable);

String tableName = createDupTable("testDupKeyWriteBase");
StarRocksSinkOptions sinkOptions = StarRocksSinkOptions.builder()
.withProperty("jdbc-url", getJdbcUrl())
.withProperty("load-url", getHttpUrls())
Expand Down Expand Up @@ -163,6 +148,25 @@ private void testDupKeyWriteBase(String tableName, String flinkDDL, RowTypeInfo
verifyResult(expectedData, actualData);
}

private String createDupTable(String tablePrefix) throws Exception {
String tableName = tablePrefix + "_" + genRandomUuid();
String createStarRocksTable =
String.format(
"CREATE TABLE `%s`.`%s` (" +
"c0 INT," +
"c1 FLOAT," +
"c2 STRING" +
") ENGINE = OLAP " +
"DUPLICATE KEY(c0) " +
"DISTRIBUTED BY HASH (c0) BUCKETS 8 " +
"PROPERTIES (" +
"\"replication_num\" = \"1\"" +
")",
DB_NAME, tableName);
executeSrSQL(createStarRocksTable);
return tableName;
}

@Test
public void testPkWriteFullColumnsInOrder() throws Exception {
String ddl = "c0 INT, c1 FLOAT, c2 STRING";
Expand All @@ -172,13 +176,11 @@ public void testPkWriteFullColumnsInOrder() throws Exception {
RowTypeInfo rowTypeInfo = new RowTypeInfo(
new TypeInformation[]{Types.INT, Types.FLOAT, Types.STRING},
new String[]{"c0", "c1", "c2"});
String tableName = "testPkWriteFullColumnsInOrder_" + genRandomUuid();

List<List<Object>> expectedData = Arrays.asList(
Arrays.asList(1, 10.1f, "abc"),
Arrays.asList(2, 20.2f, "def")
);
testPkWriteForBase(tableName, ddl, rowTypeInfo, testData, expectedData);
testPkWriteForBase(ddl, rowTypeInfo, testData, expectedData);
}

@Test
Expand All @@ -190,13 +192,11 @@ public void testPkWriteFullColumnsOutOfOrder() throws Exception {
RowTypeInfo rowTypeInfo = new RowTypeInfo(
new TypeInformation[]{Types.STRING, Types.FLOAT, Types.INT},
new String[]{"c2", "c1", "c0"});
String tableName = "testPkWriteFullColumnsOutOfOrder_" + genRandomUuid();

List<List<Object>> expectedData = Arrays.asList(
Arrays.asList(1, 10.1f, "abc"),
Arrays.asList(2, 20.2f, "def")
);
testPkWriteForBase(tableName, ddl, rowTypeInfo, testData, expectedData);
testPkWriteForBase(ddl, rowTypeInfo, testData, expectedData);
}

@Test
Expand All @@ -208,13 +208,11 @@ public void testPkWritePartialColumnsInOrder() throws Exception {
RowTypeInfo rowTypeInfo = new RowTypeInfo(
new TypeInformation[]{Types.INT, Types.STRING},
new String[]{"c0", "c2"});
String tableName = "testPkWritePartialColumnsInOrder_" + genRandomUuid();

List<List<Object>> expectedData = Arrays.asList(
Arrays.asList(1, null, "abc"),
Arrays.asList(2, null, "def")
);
testPkWriteForBase(tableName, ddl, rowTypeInfo, testData, expectedData);
testPkWriteForBase(ddl, rowTypeInfo, testData, expectedData);
}

@Test
Expand All @@ -226,32 +224,16 @@ public void testPkWritePartialColumnsOutOfOrder() throws Exception {
RowTypeInfo rowTypeInfo = new RowTypeInfo(
new TypeInformation[]{Types.STRING, Types.INT},
new String[]{"c2", "c0"});
String tableName = "testPkWritePartialColumnsOutOfOrder_" + genRandomUuid();

List<List<Object>> expectedData = Arrays.asList(
Arrays.asList(1, null, "abc"),
Arrays.asList(2, null, "def")
);
testPkWriteForBase(tableName, ddl, rowTypeInfo, testData, expectedData);
testPkWriteForBase(ddl, rowTypeInfo, testData, expectedData);
}

private void testPkWriteForBase(String tableName, String flinkDDL, RowTypeInfo rowTypeInfo,
private void testPkWriteForBase(String flinkDDL, RowTypeInfo rowTypeInfo,
List<Row> testData, List<List<Object>> expectedData) throws Exception {
String createStarRocksTable =
String.format(
"CREATE TABLE `%s`.`%s` (" +
"c0 INT," +
"c1 FLOAT," +
"c2 STRING" +
") ENGINE = OLAP " +
"PRIMARY KEY(c0) " +
"DISTRIBUTED BY HASH (c0) BUCKETS 8 " +
"PROPERTIES (" +
"\"replication_num\" = \"1\"" +
")",
DB_NAME, tableName);
executeSrSQL(createStarRocksTable);

String tableName = createPkTable("testPkWriteForBase");
StarRocksSinkOptions sinkOptions = StarRocksSinkOptions.builder()
.withProperty("jdbc-url", getJdbcUrl())
.withProperty("load-url", getHttpUrls())
Expand Down Expand Up @@ -287,22 +269,7 @@ private void testPkWriteForBase(String tableName, String flinkDDL, RowTypeInfo

@Test
public void testPKUpsertAndDelete() throws Exception {
String tableName = "testPKUpsertAndDelete_" + genRandomUuid();
String createStarRocksTable =
String.format(
"CREATE TABLE `%s`.`%s` (" +
"c0 INT," +
"c1 FLOAT," +
"c2 STRING" +
") ENGINE = OLAP " +
"PRIMARY KEY(c0) " +
"DISTRIBUTED BY HASH (c0) BUCKETS 8 " +
"PROPERTIES (" +
"\"replication_num\" = \"1\"" +
")",
DB_NAME, tableName);
executeSrSQL(createStarRocksTable);

String tableName = createPkTable("testPKUpsertAndDelete");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv;
Expand Down Expand Up @@ -353,21 +320,7 @@ public void testPKUpsertAndDelete() throws Exception {

@Test
public void testPKPartialUpdateDelete() throws Exception {
String tableName = "testPKPartialUpdateDelete_" + genRandomUuid();
String createStarRocksTable =
String.format(
"CREATE TABLE `%s`.`%s` (" +
"c0 INT," +
"c1 FLOAT," +
"c2 STRING" +
") ENGINE = OLAP " +
"PRIMARY KEY(c0) " +
"DISTRIBUTED BY HASH (c0) BUCKETS 8 " +
"PROPERTIES (" +
"\"replication_num\" = \"1\"" +
")",
DB_NAME, tableName);
executeSrSQL(createStarRocksTable);
String tableName = createPkTable("testPKPartialUpdateDelete");
executeSrSQL(String.format("INSERT INTO `%s`.`%s` VALUES (1, 1.0, '1')", DB_NAME, tableName));
verifyResult(Collections.singletonList(Arrays.asList(1, 1.0f, "1")), scanTable(DB_CONNECTION, DB_NAME, tableName));

Expand Down Expand Up @@ -412,4 +365,87 @@ public void testPKPartialUpdateDelete() throws Exception {
List<List<Object>> actualData = scanTable(DB_CONNECTION, DB_NAME, tableName);
verifyResult(expectedData, actualData);
}

@Test
public void testAtLeastOnceWithTransaction() throws Exception {
testConfigurationBase(Collections.emptyMap(), env -> null);
}

@Test
public void testAtLeastOnceWithoutTransaction() throws Exception {
testConfigurationBase(
Collections.singletonMap("sink.at-least-once.use-transaction-stream-load", "false"), env -> null);
}

@Test
public void testExactlyOnce() throws Exception {
Map<String, String> options = new HashMap<>();
options.put("sink.at-least-once.use-transaction-stream-load", "false");
options.put("sink.semantic", "exactly-once");
String checkpointDir = temporaryFolder.newFolder().toURI().toString();
testConfigurationBase(options,
env -> {
env.enableCheckpointing(1000);
env.getCheckpointConfig().setCheckpointStorage(checkpointDir);
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env.configure(config);
return null;
}
);
}

private void testConfigurationBase(Map<String, String> options, Function<StreamExecutionEnvironment, Void> setFlinkEnv) throws Exception {
String tableName = createPkTable("testAtLeastOnceBase");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
setFlinkEnv.apply(env);

StreamTableEnvironment tEnv;
tEnv = StreamTableEnvironment.create(env);

StringJoiner joiner = new StringJoiner(",\n");
for (Map.Entry<String, String> entry : options.entrySet()) {
joiner.add(String.format("'%s' = '%s'", entry.getKey(), entry.getValue()));
}
String optionStr = joiner.toString();
String createSQL = "CREATE TABLE sink(" +
"c0 INT," +
"c1 FLOAT," +
"c2 STRING," +
"PRIMARY KEY (`c0`) NOT ENFORCED" +
") WITH ( " +
"'connector' = 'starrocks'," +
"'jdbc-url'='" + getJdbcUrl() + "'," +
"'load-url'='" + String.join(";", getHttpUrls()) + "'," +
"'database-name' = '" + DB_NAME + "'," +
"'table-name' = '" + tableName + "'," +
"'username' = 'root'," +
"'password' = ''" +
(optionStr.isEmpty() ? "" : "," + optionStr) +
")";
tEnv.executeSql(createSQL);
tEnv.executeSql("INSERT INTO sink VALUES (1, 1.0, '1')").await();
List<List<Object>> actualData = scanTable(DB_CONNECTION, DB_NAME, tableName);
verifyResult(Collections.singletonList(Arrays.asList(1, 1.0, '1')), actualData);
}

private String createPkTable(String tablePrefix) throws Exception {
String tableName = tablePrefix + "_" + genRandomUuid();
String createStarRocksTable =
String.format(
"CREATE TABLE `%s`.`%s` (" +
"c0 INT," +
"c1 FLOAT," +
"c2 STRING" +
") ENGINE = OLAP " +
"PRIMARY KEY(c0) " +
"DISTRIBUTED BY HASH (c0) BUCKETS 8 " +
"PROPERTIES (" +
"\"replication_num\" = \"1\"" +
")",
DB_NAME, tableName);
executeSrSQL(createStarRocksTable);
return tableName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -35,6 +37,10 @@ public abstract class StarRocksSinkITTestBase {
private static final Logger LOG = LoggerFactory.getLogger(StarRocksSinkITTestBase.class);

private static final boolean DEBUG_MODE = false;

@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();

protected static String DB_NAME;
protected static String HTTP_URLS;
protected static String JDBC_URLS;
Expand All @@ -48,7 +54,6 @@ protected static String getJdbcUrl() {
}

protected static Connection DB_CONNECTION;

@BeforeClass
public static void setUp() throws Exception {
HTTP_URLS = DEBUG_MODE ? "127.0.0.1:11901" : System.getProperty("http_urls");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ public StreamLoadManagerV2(StreamLoadProperties properties, boolean enableAutoCo
retryIntervalInMs = 0;
} else {
// TODO transaction stream load can't support retry currently
streamLoader = properties.getMaxRetries() > 0 ? new DefaultStreamLoader() : new TransactionStreamLoader();
streamLoader = (properties.getMaxRetries() > 0 || !properties.isEnableTransaction())
? new DefaultStreamLoader() : new TransactionStreamLoader();
maxRetries = properties.getMaxRetries();
retryIntervalInMs = properties.getRetryIntervalInMs();
}
Expand Down

0 comments on commit 3e21441

Please sign in to comment.