Skip to content

Commit

Permalink
[Feature] Support to deal with update_before record
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy committed Jul 5, 2023
1 parent e3233f0 commit 19225d8
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,7 @@ public synchronized void invoke(T value, Context context) throws Exception {
}
}
if (value instanceof RowData) {
if (RowKind.UPDATE_BEFORE.equals(((RowData)value).getRowKind())) {
// do not need update_before, cauz an update action happened on the primary keys will be separated into `delete` and `create`
if (RowKind.UPDATE_BEFORE.equals(((RowData)value).getRowKind()) && sinkOptions.getIgnoreUpdateBefore()) {
return;
}
if (!sinkOptions.supportUpsertDelete() && RowKind.DELETE.equals(((RowData)value).getRowKind())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,7 @@ public void invoke(T value, Context context) throws Exception {
}
}
if (value instanceof RowData) {
if (RowKind.UPDATE_BEFORE.equals(((RowData)value).getRowKind())) {
// do not need update_before, cauz an update action happened on the primary keys will be separated into `delete` and `create`
if (RowKind.UPDATE_BEFORE.equals(((RowData)value).getRowKind()) && sinkOptions.getIgnoreUpdateBefore()) {
return;
}
if (!sinkOptions.supportUpsertDelete() && RowKind.DELETE.equals(((RowData)value).getRowKind())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public Set<ConfigOption<?>> optionalOptions() {
optionalOptions.add(StarRocksSinkOptions.SINK_IO_THREAD_COUNT);
optionalOptions.add(StarRocksSinkOptions.SINK_CHUNK_LIMIT);
optionalOptions.add(StarRocksSinkOptions.SINK_SCAN_FREQUENCY);
optionalOptions.add(StarRocksSinkOptions.SINK_IGNORE_UPDATE_BEFORE);
return optionalOptions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,15 @@ public enum StreamLoadFormat {
public static final ConfigOption<Integer> SINK_METRIC_HISTOGRAM_WINDOW_SIZE = ConfigOptions.key("sink.metric.histogram-window-size")
.intType().defaultValue(100).withDescription("Window size of histogram metrics.");

public static final ConfigOption<Boolean> SINK_IGNORE_UPDATE_BEFORE = ConfigOptions.key("sink.ignore.update-before")
.booleanType().defaultValue(true).withDescription("Whether to ignore update_before records. In general, update_before " +
"and update_after have the same primary key, and appear in pair, so we only need to write the update_after " +
"record to StarRocks, and ignore the update_before. But that hasn't always been the case, for example, if the " +
"user updates one row with the primary key changed in the OLTP, and Flink cdc will generate a before and after" +
"records, but they have the different primary keys. The connector should delete the update_before row, and " +
"insert the update_after row in StarRocks, and this options should be set false for this case. Note that how " +
"to set this options depends on the user case.");

public static final ConfigOption<Integer> SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM;

// Sink semantic
Expand Down Expand Up @@ -244,6 +253,10 @@ public Integer getSinkParallelism() {
return tableOptions.getOptional(SINK_PARALLELISM).orElse(null);
}

public boolean getIgnoreUpdateBefore() {
return tableOptions.get(SINK_IGNORE_UPDATE_BEFORE);
}

public static Builder builder() {
return new Builder();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,91 @@ public void testUpsertAndDelete() throws Exception {
deleteTestTopic(topic);
}

@Test
public void testUpdateWithPkChanged() throws Exception {
// create kafka topic and write data
final String topic = "update_with_pk_changed_topic";
createTestTopic(topic, 1, 1);
List<String> lines = readLines("data/debezium-update-with-pk-changed.txt");
try {
writeRecordsToKafka(topic, lines);
} catch (Exception e) {
throw new Exception("Failed to write debezium data to Kafka.", e);
}

// create SR table
String tableName = "testUpsertAndDelete_" + genRandomUuid();
String createStarRocksTable =
String.format(
"CREATE TABLE `%s`.`%s` (" +
"c0 INT," +
"c1 FLOAT," +
"c2 STRING" +
") ENGINE = OLAP " +
"PRIMARY KEY(c0) " +
"DISTRIBUTED BY HASH (c0) BUCKETS 8 " +
"PROPERTIES (" +
"\"replication_num\" = \"1\"" +
")",
SR_DB_NAME, tableName);
executeSrSQL(createStarRocksTable);

// ---------- Produce an event time stream into Kafka -------------------
String bootstraps = getBootstrapServers();
String sourceDDL =
String.format(
"CREATE TABLE debezium_source ("
+ " c0 INT NOT NULL,"
+ " c1 FLOAT,"
+ " c2 STRING,"
+ " PRIMARY KEY (c0) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'kafka',"
+ " 'topic' = '%s',"
+ " 'properties.bootstrap.servers' = '%s',"
+ " 'scan.startup.mode' = 'earliest-offset',"
+ " 'value.format' = 'debezium-json'"
+ ")",
topic, bootstraps);
String sinkDDL =
String.format(
"CREATE TABLE sink("
+ "c0 INT,"
+ "c1 FLOAT,"
+ "c2 STRING,"
+ "PRIMARY KEY (`c0`) NOT ENFORCED"
+ ") WITH ( "
+ "'connector' = 'starrocks',"
+ "'jdbc-url'='%s',"
+ "'load-url'='%s',"
+ "'database-name' = '%s',"
+ "'table-name' = '%s',"
+ "'username' = '%s',"
+ "'password' = '%s',"
+ "'sink.buffer-flush.interval-ms' = '1000'"
+ ")",
getSrJdbcUrl(), getSrHttpUrls(), SR_DB_NAME, tableName, "root", "");

tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
TableResult tableResult = tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");
// TODO find an elegant way to wait for the result
try {
Thread.sleep(10000);
} catch (Exception e) {
// ignore
}
tableResult.getJobClient().get().cancel().get(); // stop the job

List<List<Object>> expectedData = Arrays.asList(
Arrays.asList(2, 2.0f, "row2")
);
List<List<Object>> actualData = scanTable(SR_DB_CONNECTION, SR_DB_NAME, tableName);
verifyResult(expectedData, actualData);

deleteTestTopic(topic);
}

private void writeRecordsToKafka(String topic, List<String> lines) throws Exception {
DataStreamSource<String> stream = env.fromCollection(lines);
SerializationSchema<String> serSchema = new SimpleStringSchema();
Expand Down
2 changes: 2 additions & 0 deletions src/test/resources/data/debezium-update-with-pk-changed.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"op":"c","ts_ms":1589355606100,"before":null,"after":{"c0":1,"c1":1.0,"c2":"row1"},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"transaction":null}
{"op":"u","ts_ms":1589355606400,"before":{"c0":1,"c1":1.0,"c2":"row1"},"after":{"c0":2,"c1":2.0,"c2":"row2"},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"transaction":null}

0 comments on commit 19225d8

Please sign in to comment.