Skip to content

Commit

Permalink
[Bugfix] Fix largeint/json fails to write
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy committed Sep 4, 2023
1 parent eaa1c65 commit 3e899fd
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,11 @@ public void validateTableStructure(StarRocksSinkOptions sinkOptions, TableSchema
}

String srType = srColumn.get("DATA_TYPE").toString().toLowerCase();
boolean typeMatched = typesMap.containsKey(srType) &&
typesMap.get(srType).contains(column.getType().getLogicalType().getTypeRoot());
// Some types of StarRocks, such as json, are not mapped to Flink natively,
// and there will be no entry in typesMap, but they can be represented as
// STRING in Flink generally, so we think the type is matched even if
// typesMap does not contain the srType
boolean typeMatched = !typesMap.containsKey(srType) || typesMap.get(srType).contains(column.getType().getLogicalType().getTypeRoot());
if (!typeMatched) {
throw new IllegalArgumentException(
String.format("Flink and StarRocks types are not matched for column %s, " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,4 +466,57 @@ private String createPkTable(String tablePrefix) throws Exception {
executeSrSQL(createStarRocksTable);
return tableName;
}

@Test
public void testUnalignedTypes() throws Exception {
String tableName = "testUnalignedTypes_" + genRandomUuid();
String createStarRocksTable =
String.format(
"CREATE TABLE `%s`.`%s` (" +
"c0 INT," +
"c1 LARGEINT," +
"c2 JSON" +
") ENGINE = OLAP " +
"PRIMARY KEY(c0) " +
"DISTRIBUTED BY HASH (c0) BUCKETS 8 " +
"PROPERTIES (" +
"\"replication_num\" = \"1\"" +
")",
DB_NAME, tableName);
executeSrSQL(createStarRocksTable);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

StarRocksSinkOptions sinkOptions = StarRocksSinkOptions.builder()
.withProperty("jdbc-url", getJdbcUrl())
.withProperty("load-url", getHttpUrls())
.withProperty("database-name", DB_NAME)
.withProperty("table-name", tableName)
.withProperty("username", "root")
.withProperty("password", "")
.build();

String createSQL = "CREATE TABLE sink(" +
"c0 INT," +
"c1 STRING," +
"c2 STRING," +
"PRIMARY KEY (`c0`) NOT ENFORCED" +
") WITH ( " +
"'connector' = 'starrocks'," +
"'jdbc-url'='" + sinkOptions.getJdbcUrl() + "'," +
"'load-url'='" + String.join(";", sinkOptions.getLoadUrlList()) + "'," +
"'sink.version' = '" + (isSinkV2 ? "V2" : "V1") + "'," +
"'database-name' = '" + DB_NAME + "'," +
"'table-name' = '" + sinkOptions.getTableName() + "'," +
"'username' = '" + sinkOptions.getUsername() + "'," +
"'password' = '" + sinkOptions.getPassword() + "'" +
")";

tEnv.executeSql(createSQL);
tEnv.executeSql("INSERT INTO sink VALUES (1, '123', '{\"key\": 1, \"value\": 2}')").await();
List<List<Object>> actualData = scanTable(DB_CONNECTION, DB_NAME, tableName);
verifyResult(Collections.singletonList(Arrays.asList(1, "123", "{\"key\": 1, \"value\": 2}")), actualData);
}
}

0 comments on commit 3e899fd

Please sign in to comment.