Skip to content

Commit

Permalink
Skip non primary key table
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy committed Jul 5, 2023
1 parent 19225d8 commit 66f84b9
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ public synchronized void invoke(T value, Context context) throws Exception {
}
}
if (value instanceof RowData) {
if (RowKind.UPDATE_BEFORE.equals(((RowData)value).getRowKind()) && sinkOptions.getIgnoreUpdateBefore()) {
if (RowKind.UPDATE_BEFORE.equals(((RowData)value).getRowKind()) &&
(!sinkOptions.supportUpsertDelete() || sinkOptions.getIgnoreUpdateBefore())) {
return;
}
if (!sinkOptions.supportUpsertDelete() && RowKind.DELETE.equals(((RowData)value).getRowKind())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ public void invoke(T value, Context context) throws Exception {
}
}
if (value instanceof RowData) {
if (RowKind.UPDATE_BEFORE.equals(((RowData)value).getRowKind()) && sinkOptions.getIgnoreUpdateBefore()) {
if (RowKind.UPDATE_BEFORE.equals(((RowData)value).getRowKind()) &&
(!sinkOptions.supportUpsertDelete() || sinkOptions.getIgnoreUpdateBefore())) {
return;
}
if (!sinkOptions.supportUpsertDelete() && RowKind.DELETE.equals(((RowData)value).getRowKind())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public abstract class KafkaTableTestBase extends AbstractTestBase {

private static final Logger LOG = LoggerFactory.getLogger(KafkaTableTestBase.class);

private static final boolean DEBUG_MODE = false;
private static final boolean DEBUG_MODE = true;

private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";
private static final int zkTimeoutMills = 30000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.junit.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;

Expand All @@ -51,104 +52,37 @@ public void before() {

@Test
public void testUpsertAndDelete() throws Exception {
// create kafka topic and write data
final String topic = "upsert_and_delete_topic";
createTestTopic(topic, 1, 1);
List<String> lines = readLines("data/debezium-upsert-and-delete.txt");
try {
writeRecordsToKafka(topic, lines);
} catch (Exception e) {
throw new Exception("Failed to write debezium data to Kafka.", e);
}

// create SR table
String tableName = "testUpsertAndDelete_" + genRandomUuid();
String createStarRocksTable =
String.format(
"CREATE TABLE `%s`.`%s` (" +
"c0 INT," +
"c1 FLOAT," +
"c2 STRING" +
") ENGINE = OLAP " +
"PRIMARY KEY(c0) " +
"DISTRIBUTED BY HASH (c0) BUCKETS 8 " +
"PROPERTIES (" +
"\"replication_num\" = \"1\"" +
")",
SR_DB_NAME, tableName);
executeSrSQL(createStarRocksTable);

// ---------- Produce an event time stream into Kafka -------------------
String bootstraps = getBootstrapServers();
String sourceDDL =
String.format(
"CREATE TABLE debezium_source ("
+ " c0 INT NOT NULL,"
+ " c1 FLOAT,"
+ " c2 STRING,"
+ " PRIMARY KEY (c0) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'kafka',"
+ " 'topic' = '%s',"
+ " 'properties.bootstrap.servers' = '%s',"
+ " 'scan.startup.mode' = 'earliest-offset',"
+ " 'value.format' = 'debezium-json'"
+ ")",
topic, bootstraps);
String sinkDDL =
String.format(
"CREATE TABLE sink("
+ "c0 INT,"
+ "c1 FLOAT,"
+ "c2 STRING,"
+ "PRIMARY KEY (`c0`) NOT ENFORCED"
+ ") WITH ( "
+ "'connector' = 'starrocks',"
+ "'jdbc-url'='%s',"
+ "'load-url'='%s',"
+ "'database-name' = '%s',"
+ "'table-name' = '%s',"
+ "'username' = '%s',"
+ "'password' = '%s',"
+ "'sink.buffer-flush.interval-ms' = '1000'"
+ ")",
getSrJdbcUrl(), getSrHttpUrls(), SR_DB_NAME, tableName, "root", "");

tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
TableResult tableResult = tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");
// TODO find an elegant way to wait for the result
try {
Thread.sleep(10000);
} catch (Exception e) {
// ignore
}
tableResult.getJobClient().get().cancel().get(); // stop the job

List<List<Object>> expectedData = Arrays.asList(
Arrays.asList(1, 2.0f, "row1"),
Arrays.asList(2, 2.0f, "row2")
);
List<List<Object>> actualData = scanTable(SR_DB_CONNECTION, SR_DB_NAME, tableName);
verifyResult(expectedData, actualData);

deleteTestTopic(topic);
testPrimaryKeyBase(
"upsert_and_delete_topic",
"data/debezium-upsert-and-delete.txt",
Arrays.asList(
Arrays.asList(1, 2.0f, "row1"),
Arrays.asList(2, 2.0f, "row2")
)
);
}

@Test
public void testUpdateWithPkChanged() throws Exception {
testPrimaryKeyBase(
"update_with_pk_changed_topic",
"data/debezium-update-with-pk-changed.txt",
Collections.singletonList(Arrays.asList(2, 2.0f, "row2"))
);
}

private void testPrimaryKeyBase(String topic, String dataFile, List<List<Object>> expectedData) throws Exception {
// create kafka topic and write data
final String topic = "update_with_pk_changed_topic";
createTestTopic(topic, 1, 1);
List<String> lines = readLines("data/debezium-update-with-pk-changed.txt");
List<String> lines = readLines(dataFile);
try {
writeRecordsToKafka(topic, lines);
} catch (Exception e) {
throw new Exception("Failed to write debezium data to Kafka.", e);
}

// create SR table
String tableName = "testUpsertAndDelete_" + genRandomUuid();
String tableName = "testPrimaryKeyBase_" + genRandomUuid();
String createStarRocksTable =
String.format(
"CREATE TABLE `%s`.`%s` (" +
Expand Down Expand Up @@ -210,10 +144,6 @@ public void testUpdateWithPkChanged() throws Exception {
// ignore
}
tableResult.getJobClient().get().cancel().get(); // stop the job

List<List<Object>> expectedData = Arrays.asList(
Arrays.asList(2, 2.0f, "row2")
);
List<List<Object>> actualData = scanTable(SR_DB_CONNECTION, SR_DB_NAME, tableName);
verifyResult(expectedData, actualData);

Expand Down

0 comments on commit 66f84b9

Please sign in to comment.