diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunction.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunction.java index b855b238..337994b1 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunction.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunction.java @@ -146,7 +146,8 @@ public synchronized void invoke(T value, Context context) throws Exception { } } if (value instanceof RowData) { - if (RowKind.UPDATE_BEFORE.equals(((RowData)value).getRowKind()) && sinkOptions.getIgnoreUpdateBefore()) { + if (RowKind.UPDATE_BEFORE.equals(((RowData)value).getRowKind()) && + (!sinkOptions.supportUpsertDelete() || sinkOptions.getIgnoreUpdateBefore())) { return; } if (!sinkOptions.supportUpsertDelete() && RowKind.DELETE.equals(((RowData)value).getRowKind())) { diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionV2.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionV2.java index 0e2f9f3b..74c5528d 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionV2.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionV2.java @@ -150,7 +150,8 @@ public void invoke(T value, Context context) throws Exception { } } if (value instanceof RowData) { - if (RowKind.UPDATE_BEFORE.equals(((RowData)value).getRowKind()) && sinkOptions.getIgnoreUpdateBefore()) { + if (RowKind.UPDATE_BEFORE.equals(((RowData)value).getRowKind()) && + (!sinkOptions.supportUpsertDelete() || sinkOptions.getIgnoreUpdateBefore())) { return; } if (!sinkOptions.supportUpsertDelete() && RowKind.DELETE.equals(((RowData)value).getRowKind())) { diff --git a/src/test/java/com/starrocks/connector/flink/it/sink/kafka/KafkaTableTestBase.java b/src/test/java/com/starrocks/connector/flink/it/sink/kafka/KafkaTableTestBase.java index 2f632d8b..3b39447b 100644 --- a/src/test/java/com/starrocks/connector/flink/it/sink/kafka/KafkaTableTestBase.java +++ b/src/test/java/com/starrocks/connector/flink/it/sink/kafka/KafkaTableTestBase.java @@ -69,7 +69,7 @@ public abstract class KafkaTableTestBase extends AbstractTestBase { private static final Logger LOG = LoggerFactory.getLogger(KafkaTableTestBase.class); - private static final boolean DEBUG_MODE = false; + private static final boolean DEBUG_MODE = true; private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka"; private static final int zkTimeoutMills = 30000; diff --git a/src/test/java/com/starrocks/connector/flink/it/sink/kafka/KafkaToStarRocksITTest.java b/src/test/java/com/starrocks/connector/flink/it/sink/kafka/KafkaToStarRocksITTest.java index f0c21a06..5d443725 100644 --- a/src/test/java/com/starrocks/connector/flink/it/sink/kafka/KafkaToStarRocksITTest.java +++ b/src/test/java/com/starrocks/connector/flink/it/sink/kafka/KafkaToStarRocksITTest.java @@ -32,6 +32,7 @@ import org.junit.Test; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Properties; @@ -51,96 +52,29 @@ public void before() { @Test public void testUpsertAndDelete() throws Exception { - // create kafka topic and write data - final String topic = "upsert_and_delete_topic"; - createTestTopic(topic, 1, 1); - List lines = readLines("data/debezium-upsert-and-delete.txt"); - try { - writeRecordsToKafka(topic, lines); - } catch (Exception e) { - throw new Exception("Failed to write debezium data to Kafka.", e); - } - - // create SR table - String tableName = "testUpsertAndDelete_" + genRandomUuid(); - String createStarRocksTable = - String.format( - "CREATE TABLE `%s`.`%s` (" + - "c0 INT," + - "c1 FLOAT," + - "c2 STRING" + - ") ENGINE = OLAP " + - "PRIMARY KEY(c0) " + - "DISTRIBUTED BY HASH (c0) BUCKETS 8 " + - "PROPERTIES (" + - "\"replication_num\" = \"1\"" + - ")", - SR_DB_NAME, tableName); - executeSrSQL(createStarRocksTable); - - // ---------- Produce an event time stream into Kafka ------------------- - String bootstraps = getBootstrapServers(); - String sourceDDL = - String.format( - "CREATE TABLE debezium_source (" - + " c0 INT NOT NULL," - + " c1 FLOAT," - + " c2 STRING," - + " PRIMARY KEY (c0) NOT ENFORCED" - + ") WITH (" - + " 'connector' = 'kafka'," - + " 'topic' = '%s'," - + " 'properties.bootstrap.servers' = '%s'," - + " 'scan.startup.mode' = 'earliest-offset'," - + " 'value.format' = 'debezium-json'" - + ")", - topic, bootstraps); - String sinkDDL = - String.format( - "CREATE TABLE sink(" - + "c0 INT," - + "c1 FLOAT," - + "c2 STRING," - + "PRIMARY KEY (`c0`) NOT ENFORCED" - + ") WITH ( " - + "'connector' = 'starrocks'," - + "'jdbc-url'='%s'," - + "'load-url'='%s'," - + "'database-name' = '%s'," - + "'table-name' = '%s'," - + "'username' = '%s'," - + "'password' = '%s'," - + "'sink.buffer-flush.interval-ms' = '1000'" - + ")", - getSrJdbcUrl(), getSrHttpUrls(), SR_DB_NAME, tableName, "root", ""); - - tEnv.executeSql(sourceDDL); - tEnv.executeSql(sinkDDL); - TableResult tableResult = tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source"); - // TODO find an elegant way to wait for the result - try { - Thread.sleep(10000); - } catch (Exception e) { - // ignore - } - tableResult.getJobClient().get().cancel().get(); // stop the job - - List> expectedData = Arrays.asList( - Arrays.asList(1, 2.0f, "row1"), - Arrays.asList(2, 2.0f, "row2") - ); - List> actualData = scanTable(SR_DB_CONNECTION, SR_DB_NAME, tableName); - verifyResult(expectedData, actualData); - - deleteTestTopic(topic); + testPrimaryKeyBase( + "upsert_and_delete_topic", + "data/debezium-upsert-and-delete.txt", + Arrays.asList( + Arrays.asList(1, 2.0f, "row1"), + Arrays.asList(2, 2.0f, "row2") + ) + ); } @Test public void testUpdateWithPkChanged() throws Exception { + testPrimaryKeyBase( + "update_with_pk_changed_topic", + "data/debezium-update-with-pk-changed.txt", + Collections.singletonList(Arrays.asList(2, 2.0f, "row2")) + ); + } + + private void testPrimaryKeyBase(String topic, String dataFile, List> expectedData) throws Exception { // create kafka topic and write data - final String topic = "update_with_pk_changed_topic"; createTestTopic(topic, 1, 1); - List lines = readLines("data/debezium-update-with-pk-changed.txt"); + List lines = readLines(dataFile); try { writeRecordsToKafka(topic, lines); } catch (Exception e) { @@ -148,7 +82,7 @@ public void testUpdateWithPkChanged() throws Exception { } // create SR table - String tableName = "testUpsertAndDelete_" + genRandomUuid(); + String tableName = "testPrimaryKeyBase_" + genRandomUuid(); String createStarRocksTable = String.format( "CREATE TABLE `%s`.`%s` (" + @@ -210,10 +144,6 @@ public void testUpdateWithPkChanged() throws Exception { // ignore } tableResult.getJobClient().get().cancel().get(); // stop the job - - List> expectedData = Arrays.asList( - Arrays.asList(2, 2.0f, "row2") - ); List> actualData = scanTable(SR_DB_CONNECTION, SR_DB_NAME, tableName); verifyResult(expectedData, actualData);