Skip to content

Commit

Permalink
[UT] Add test for upsert and delete for pk
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy committed Jul 3, 2023
1 parent 883cc27 commit 32524e7
Show file tree
Hide file tree
Showing 8 changed files with 216 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
package com.starrocks.connector.flink.it.sink;

import com.starrocks.connector.flink.StarRocksSinkBaseTest;

import mockit.Expectations;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.junit.Ignore;
import org.junit.Test;

import java.util.ArrayList;
Expand All @@ -28,6 +28,7 @@

import static org.junit.Assert.assertFalse;

@Ignore
public class StarRocksDynamicTableSinkITTest extends StarRocksSinkBaseTest {

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,24 @@

package com.starrocks.connector.flink.it.sink;

import com.starrocks.connector.flink.StarRocksSink;
import com.starrocks.connector.flink.StarRocksSinkBaseTest;
import mockit.Expectations;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.junit.Ignore;
import org.junit.Test;

import mockit.Expectations;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.starrocks.connector.flink.StarRocksSink;
import com.starrocks.connector.flink.StarRocksSinkBaseTest;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;

class TestEntry implements Serializable {

Expand All @@ -47,6 +46,7 @@ public TestEntry(Integer score, String name) {
}
}

@Ignore
public class StarRocksGenericSinkITTest extends StarRocksSinkBaseTest {

private final TestEntry[] TEST_DATA = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,43 +24,56 @@
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.StringJoiner;
import java.util.UUID;

@Ignore
public class StarRocksSinkTest {
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assume.assumeTrue;

private static final Logger LOG = LoggerFactory.getLogger(StarRocksSinkTest.class);
public class StarRocksSinkITTest {

private static final Logger LOG = LoggerFactory.getLogger(StarRocksSinkITTest.class);

private static String DB_NAME;
private static String HTTP_URLS = null; // "127.0.0.1:11901";
private static String JDBC_URLS = null; // "jdbc:mysql://127.0.0.1:11903";

private static String getHttpUrls() {
return "127.0.0.1:11901";
return HTTP_URLS;
}

private static String getJdbcUrl() {
return "jdbc:mysql://127.0.0.1:11903";
return JDBC_URLS;
}

private static Connection DB_CONNECTION;

@BeforeClass
public static void setUp() throws Exception {
HTTP_URLS = System.getProperty("http_urls");
JDBC_URLS = System.getProperty("jdbc_urls");
assumeTrue(HTTP_URLS != null && JDBC_URLS != null);

DB_NAME = "sr_sink_test_" + genRandomUuid();
try {
DB_CONNECTION = DriverManager.getConnection(getJdbcUrl(), "root", "");
Expand Down Expand Up @@ -114,7 +127,12 @@ public void testDupKeyWriteFullColumnsInOrder() throws Exception {
new TypeInformation[]{Types.INT, Types.FLOAT, Types.STRING},
new String[]{"c0", "c1", "c2"});
String tableName = "testDupKeyWriteFullColumnsInOrder_" + genRandomUuid();
testDupKeyWriteBase(tableName, ddl, rowTypeInfo, testData);

List<List<Object>> expectedData = Arrays.asList(
Arrays.asList(1, 10.1f, "abc"),
Arrays.asList(2, 20.2f, "def")
);
testDupKeyWriteBase(tableName, ddl, rowTypeInfo, testData, expectedData);
}

@Test
Expand All @@ -127,7 +145,12 @@ public void testDupKeyWriteFullColumnsOutOfOrder() throws Exception {
new TypeInformation[]{Types.STRING, Types.FLOAT, Types.INT},
new String[]{"c2", "c1", "c0"});
String tableName = "testDupKeyWriteFullColumnsOutOfOrder_" + genRandomUuid();
testDupKeyWriteBase(tableName, ddl, rowTypeInfo, testData);

List<List<Object>> expectedData = Arrays.asList(
Arrays.asList(1, 10.1f, "abc"),
Arrays.asList(2, 20.2f, "def")
);
testDupKeyWriteBase(tableName, ddl, rowTypeInfo, testData, expectedData);
}

@Test
Expand All @@ -140,7 +163,12 @@ public void testDupKeyWritePartialColumnsInOrder() throws Exception {
new TypeInformation[]{Types.INT, Types.STRING},
new String[]{"c0", "c2"});
String tableName = "testDupKeyWritePartialColumnsInOrder_" + genRandomUuid();
testDupKeyWriteBase(tableName, ddl, rowTypeInfo, testData);

List<List<Object>> expectedData = Arrays.asList(
Arrays.asList(1, null, "abc"),
Arrays.asList(2, null, "def")
);
testDupKeyWriteBase(tableName, ddl, rowTypeInfo, testData, expectedData);
}

@Test
Expand All @@ -153,10 +181,16 @@ public void testDupKeyWritePartialColumnsOutOfOrder() throws Exception {
new TypeInformation[]{Types.STRING, Types.INT},
new String[]{"c2", "c0"});
String tableName = "testDupKeyWritePartialColumnsOutOfOrder_" + genRandomUuid();
testDupKeyWriteBase(tableName, ddl, rowTypeInfo, testData);

List<List<Object>> expectedData = Arrays.asList(
Arrays.asList(1, null, "abc"),
Arrays.asList(2, null, "def")
);
testDupKeyWriteBase(tableName, ddl, rowTypeInfo, testData, expectedData);
}

private void testDupKeyWriteBase(String tableName, String flinkDDL, RowTypeInfo rowTypeInfo, List<Row> testData) throws Exception {
private void testDupKeyWriteBase(String tableName, String flinkDDL, RowTypeInfo rowTypeInfo,
List<Row> testData, List<List<Object>> expectedData) throws Exception {
String createStarRocksTable =
String.format(
"CREATE TABLE `%s`.`%s` (" +
Expand Down Expand Up @@ -199,8 +233,9 @@ private void testDupKeyWriteBase(String tableName, String flinkDDL, RowTypeInfo
DataStream<Row> srcDs = env.fromCollection(testData).returns(rowTypeInfo);
Table in = tEnv.fromDataStream(srcDs);
tEnv.createTemporaryView("src", in);
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM src");
result.await();
tEnv.executeSql("INSERT INTO sink SELECT * FROM src").await();
List<List<Object>> actualData = scanTable(DB_NAME, tableName);
verifyResult(expectedData, actualData);
}

@Test
Expand All @@ -213,7 +248,12 @@ public void testPkWriteFullColumnsInOrder() throws Exception {
new TypeInformation[]{Types.INT, Types.FLOAT, Types.STRING},
new String[]{"c0", "c1", "c2"});
String tableName = "testPkWriteFullColumnsInOrder_" + genRandomUuid();
testPkWriteForBase(tableName, ddl, rowTypeInfo, testData);

List<List<Object>> expectedData = Arrays.asList(
Arrays.asList(1, 10.1f, "abc"),
Arrays.asList(2, 20.2f, "def")
);
testPkWriteForBase(tableName, ddl, rowTypeInfo, testData, expectedData);
}

@Test
Expand All @@ -226,7 +266,12 @@ public void testPkWriteFullColumnsOutOfOrder() throws Exception {
new TypeInformation[]{Types.STRING, Types.FLOAT, Types.INT},
new String[]{"c2", "c1", "c0"});
String tableName = "testPkWriteFullColumnsOutOfOrder_" + genRandomUuid();
testPkWriteForBase(tableName, ddl, rowTypeInfo, testData);

List<List<Object>> expectedData = Arrays.asList(
Arrays.asList(1, 10.1f, "abc"),
Arrays.asList(2, 20.2f, "def")
);
testPkWriteForBase(tableName, ddl, rowTypeInfo, testData, expectedData);
}

@Test
Expand All @@ -239,7 +284,12 @@ public void testPkWritePartialColumnsInOrder() throws Exception {
new TypeInformation[]{Types.INT, Types.STRING},
new String[]{"c0", "c2"});
String tableName = "testPkWritePartialColumnsInOrder_" + genRandomUuid();
testPkWriteForBase(tableName, ddl, rowTypeInfo, testData);

List<List<Object>> expectedData = Arrays.asList(
Arrays.asList(1, null, "abc"),
Arrays.asList(2, null, "def")
);
testPkWriteForBase(tableName, ddl, rowTypeInfo, testData, expectedData);
}

@Test
Expand All @@ -252,10 +302,16 @@ public void testPkWritePartialColumnsOutOfOrder() throws Exception {
new TypeInformation[]{Types.STRING, Types.INT},
new String[]{"c2", "c0"});
String tableName = "testPkWritePartialColumnsOutOfOrder_" + genRandomUuid();
testPkWriteForBase(tableName, ddl, rowTypeInfo, testData);

List<List<Object>> expectedData = Arrays.asList(
Arrays.asList(1, null, "abc"),
Arrays.asList(2, null, "def")
);
testPkWriteForBase(tableName, ddl, rowTypeInfo, testData, expectedData);
}

private void testPkWriteForBase(String tableName, String flinkDDL, RowTypeInfo rowTypeInfo, List<Row> testData) throws Exception {
private void testPkWriteForBase(String tableName, String flinkDDL, RowTypeInfo rowTypeInfo,
List<Row> testData, List<List<Object>> expectedData) throws Exception {
String createStarRocksTable =
String.format(
"CREATE TABLE `%s`.`%s` (" +
Expand Down Expand Up @@ -299,7 +355,115 @@ private void testPkWriteForBase(String tableName, String flinkDDL, RowTypeInfo
DataStream<Row> srcDs = env.fromCollection(testData).returns(rowTypeInfo);
Table in = tEnv.fromDataStream(srcDs);
tEnv.createTemporaryView("src", in);
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM src");
result.await();
tEnv.executeSql("INSERT INTO sink SELECT * FROM src").await();
List<List<Object>> actualData = scanTable(DB_NAME, tableName);
verifyResult(expectedData, actualData);
}

@Test
public void testPKUpsertAndDelete() throws Exception {
String tableName = "testPKUpsertAndDelete_" + genRandomUuid();
String createStarRocksTable =
String.format(
"CREATE TABLE `%s`.`%s` (" +
"c0 INT," +
"c1 FLOAT," +
"c2 STRING" +
") ENGINE = OLAP " +
"PRIMARY KEY(c0) " +
"DISTRIBUTED BY HASH (c0) BUCKETS 8 " +
"PROPERTIES (" +
"\"replication_num\" = \"1\"" +
")",
DB_NAME, tableName);
executeSRDDLSQL(createStarRocksTable);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv;
tEnv = StreamTableEnvironment.create(env);
DataStream<Row> dataStream =
env.fromElements(
Row.ofKind(RowKind.INSERT, 1, 1.0f, "row1"),
Row.ofKind(RowKind.INSERT, 2, 2.0f, "row2"),
Row.ofKind(RowKind.UPDATE_AFTER, 1, 3.0f, "row3"),
Row.ofKind(RowKind.UPDATE_AFTER, 3, 3.0f, "row3"),
Row.ofKind(RowKind.DELETE, 2, 2.0f, "row2"));
Table table = tEnv.fromChangelogStream(dataStream, Schema.newBuilder().build(), ChangelogMode.all());
tEnv.createTemporaryView("src", table);

StarRocksSinkOptions sinkOptions = StarRocksSinkOptions.builder()
.withProperty("jdbc-url", getJdbcUrl())
.withProperty("load-url", getHttpUrls())
.withProperty("database-name", DB_NAME)
.withProperty("table-name", tableName)
.withProperty("username", "root")
.withProperty("password", "")
.build();

String createSQL = "CREATE TABLE sink(" +
"c0 INT," +
"c1 FLOAT," +
"c2 STRING," +
"PRIMARY KEY (`c0`) NOT ENFORCED" +
") WITH ( " +
"'connector' = 'starrocks'," +
"'jdbc-url'='" + sinkOptions.getJdbcUrl() + "'," +
"'load-url'='" + String.join(";", sinkOptions.getLoadUrlList()) + "'," +
"'database-name' = '" + DB_NAME + "'," +
"'table-name' = '" + sinkOptions.getTableName() + "'," +
"'username' = '" + sinkOptions.getUsername() + "'," +
"'password' = '" + sinkOptions.getPassword() + "'" +
")";
tEnv.executeSql(createSQL);
tEnv.executeSql("INSERT INTO sink SELECT * FROM src").await();

List<List<Object>> expectedData = Arrays.asList(
Arrays.asList(1, 3.0f, "row3"),
Arrays.asList(3, 3.0f, "row3")
);
List<List<Object>> actualData = scanTable(DB_NAME, tableName);
verifyResult(expectedData, actualData);
}

// Scan table, and returns a collection of rows
private static List<List<Object>> scanTable(String db, String table) throws SQLException {
try (PreparedStatement statement = DB_CONNECTION.prepareStatement(String.format("SELECT * FROM `%s`.`%s`", db, table))) {
try (ResultSet resultSet = statement.executeQuery()) {
List<List<Object>> results = new ArrayList<>();
int numColumns = resultSet.getMetaData().getColumnCount();
while (resultSet.next()) {
List<Object> row = new ArrayList<>();
for (int i = 1; i <= numColumns; i++) {
row.add(resultSet.getObject(i));
}
results.add(row);
}
return results;
}
}
}

private static void verifyResult(List<List<Object>> expected, List<List<Object>> actual) {
List<String> expectedRows = new ArrayList<>();
List<String> actualRows = new ArrayList<>();
for (List<Object> row : expected) {
StringJoiner joiner = new StringJoiner(",");
for (Object col : row) {
joiner.add(col == null ? "null" : col.toString());
}
expectedRows.add(joiner.toString());
}
expectedRows.sort(String::compareTo);

for (List<Object> row : actual) {
StringJoiner joiner = new StringJoiner(",");
for (Object col : row) {
joiner.add(col == null ? "null" : col.toString());
}
actualRows.add(joiner.toString());
}
actualRows.sort(String::compareTo);
assertArrayEquals(expectedRows.toArray(), actualRows.toArray());
}
}
Loading

0 comments on commit 32524e7

Please sign in to comment.