From 6d595c8080053570d335fcb63ff66e65ef9254fd Mon Sep 17 00:00:00 2001 From: PengFei Li Date: Wed, 5 Jul 2023 00:55:34 +0800 Subject: [PATCH] [UT] Add IT framework for pipeline from kakfka to starrocks Signed-off-by: PengFei Li --- pom.xml | 173 +++++++++- .../flink/it/sink/StarRocksSinkITTest.java | 131 +------- .../it/sink/StarRocksSinkITTestBase.java | 100 ++++++ .../flink/it/sink/StarRocksTableUtils.java | 73 +++++ .../it/sink/kafka/KafkaTableTestBase.java | 303 ++++++++++++++++++ .../it/sink/kafka/KafkaTableTestUtils.java | 113 +++++++ .../it/sink/kafka/KafkaToStarRocksITTest.java | 162 ++++++++++ .../data/debezium-upsert-and-delete.txt | 5 + src/test/resources/log4j.properties | 30 ++ 9 files changed, 949 insertions(+), 141 deletions(-) create mode 100644 src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTestBase.java create mode 100644 src/test/java/com/starrocks/connector/flink/it/sink/StarRocksTableUtils.java create mode 100644 src/test/java/com/starrocks/connector/flink/it/sink/kafka/KafkaTableTestBase.java create mode 100644 src/test/java/com/starrocks/connector/flink/it/sink/kafka/KafkaTableTestUtils.java create mode 100644 src/test/java/com/starrocks/connector/flink/it/sink/kafka/KafkaToStarRocksITTest.java create mode 100644 src/test/resources/data/debezium-upsert-and-delete.txt create mode 100644 src/test/resources/log4j.properties diff --git a/pom.xml b/pom.xml index ce185ecb..d879e22b 100644 --- a/pom.xml +++ b/pom.xml @@ -56,6 +56,8 @@ limitations under the License. 3.0.0-M4 1.15.0 5.0.0 + 2.8.1 + 2.12 @@ -66,18 +68,17 @@ limitations under the License. jar-with-dependencies - org.apache.flink - flink-table-planner-loader + flink-table-planner_${scala.binary.version} ${flink.version} provided - + org.apache.flink flink-table-runtime - 1.15.0 + ${flink.version} provided @@ -153,25 +154,162 @@ limitations under the License. arrow-memory-netty ${arrow.version} + + org.apache.flink - flink-test-utils + flink-connector-kafka + ${flink.version} + test + + + + org.apache.flink + flink-connector-kafka + ${flink.version} + test-jar + test + + + + org.apache.flink + flink-core + ${flink.version} + test + test-jar + + + + org.apache.flink + flink-streaming-java ${flink.version} test + test-jar + + + + + org.apache.kafka + kafka_${scala.binary.version} + ${kafka.version} - - flink-runtime - org.apache.flink - - - flink-clients - org.apache.flink - - - curator-test - org.apache.curator - + + org.slf4j + slf4j-api + + test + + + + org.apache.zookeeper + zookeeper + 3.5.9 + test + + + + org.testcontainers + kafka + 1.18.3 + test + + + + org.testcontainers + junit-jupiter + 1.18.3 + test + + + + org.apache.flink + flink-tests + ${flink.version} + test-jar + test + + + + org.apache.flink + flink-test-utils + ${flink.version} + test + + + + org.apache.flink + flink-connector-test-utils + ${flink.version} + test + + + + org.apache.flink + flink-runtime + ${flink.version} + test-jar + test + + + + org.apache.flink + flink-metrics-jmx + ${flink.version} + test + + + + + org.apache.flink + flink-table-common + ${flink.version} + test-jar + test + + + + org.apache.flink + flink-table-api-scala-bridge_${scala.binary.version} + ${flink.version} + test + + + + org.apache.flink + flink-table-planner_${scala.binary.version} + ${flink.version} + test-jar + test + + + + + org.apache.flink + flink-json + ${flink.version} + test + + + + org.apache.flink + flink-avro + ${flink.version} + test + + + + org.apache.flink + flink-avro-confluent-registry + ${flink.version} + test + + + + org.apache.flink + flink-csv + ${flink.version} + test @@ -187,6 +325,7 @@ limitations under the License. test + diff --git a/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java b/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java index 7993408a..00cc0ae3 100644 --- a/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java +++ b/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java @@ -30,92 +30,16 @@ import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; + import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.StringJoiner; -import java.util.UUID; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assume.assumeTrue; - -public class StarRocksSinkITTest { - - private static final Logger LOG = LoggerFactory.getLogger(StarRocksSinkITTest.class); - - private static String DB_NAME; - private static String HTTP_URLS = null; // "127.0.0.1:11901"; - private static String JDBC_URLS = null; // "jdbc:mysql://127.0.0.1:11903"; - private static String getHttpUrls() { - return HTTP_URLS; - } +import static com.starrocks.connector.flink.it.sink.StarRocksTableUtils.scanTable; +import static com.starrocks.connector.flink.it.sink.StarRocksTableUtils.verifyResult; - private static String getJdbcUrl() { - return JDBC_URLS; - } - - private static Connection DB_CONNECTION; - - @BeforeClass - public static void setUp() throws Exception { - HTTP_URLS = System.getProperty("http_urls"); - JDBC_URLS = System.getProperty("jdbc_urls"); - assumeTrue(HTTP_URLS != null && JDBC_URLS != null); - - DB_NAME = "sr_sink_test_" + genRandomUuid(); - try { - DB_CONNECTION = DriverManager.getConnection(getJdbcUrl(), "root", ""); - LOG.info("Success to create db connection via jdbc {}", getJdbcUrl()); - } catch (Exception e) { - LOG.error("Failed to create db connection via jdbc {}", getJdbcUrl(), e); - throw e; - } - - try { - String createDb = "CREATE DATABASE " + DB_NAME; - executeSRDDLSQL(createDb); - LOG.info("Successful to create database {}", DB_NAME); - } catch (Exception e) { - LOG.error("Failed to create database {}", DB_NAME, e); - throw e; - } - } - - @AfterClass - public static void tearDown() throws Exception { - if (DB_CONNECTION != null) { - try { - String dropDb = String.format("DROP DATABASE IF EXISTS %s FORCE", DB_NAME); - executeSRDDLSQL(dropDb); - LOG.info("Successful to drop database {}", DB_NAME); - } catch (Exception e) { - LOG.error("Failed to drop database {}", DB_NAME, e); - } - DB_CONNECTION.close(); - } - } - - private static String genRandomUuid() { - return UUID.randomUUID().toString().replace("-", "_"); - } - - private static void executeSRDDLSQL(String sql) throws Exception { - try (PreparedStatement statement = DB_CONNECTION.prepareStatement(sql)) { - statement.execute(); - } - } +public class StarRocksSinkITTest extends StarRocksSinkITTestBase { @Test public void testDupKeyWriteFullColumnsInOrder() throws Exception { @@ -234,7 +158,7 @@ private void testDupKeyWriteBase(String tableName, String flinkDDL, RowTypeInfo Table in = tEnv.fromDataStream(srcDs); tEnv.createTemporaryView("src", in); tEnv.executeSql("INSERT INTO sink SELECT * FROM src").await(); - List> actualData = scanTable(DB_NAME, tableName); + List> actualData = scanTable(DB_CONNECTION, DB_NAME, tableName); verifyResult(expectedData, actualData); } @@ -356,7 +280,7 @@ private void testPkWriteForBase(String tableName, String flinkDDL, RowTypeInfo Table in = tEnv.fromDataStream(srcDs); tEnv.createTemporaryView("src", in); tEnv.executeSql("INSERT INTO sink SELECT * FROM src").await(); - List> actualData = scanTable(DB_NAME, tableName); + List> actualData = scanTable(DB_CONNECTION, DB_NAME, tableName); verifyResult(expectedData, actualData); } @@ -422,48 +346,7 @@ public void testPKUpsertAndDelete() throws Exception { Arrays.asList(1, 3.0f, "row3"), Arrays.asList(3, 3.0f, "row3") ); - List> actualData = scanTable(DB_NAME, tableName); + List> actualData = scanTable(DB_CONNECTION, DB_NAME, tableName); verifyResult(expectedData, actualData); } - - // Scan table, and returns a collection of rows - private static List> scanTable(String db, String table) throws SQLException { - try (PreparedStatement statement = DB_CONNECTION.prepareStatement(String.format("SELECT * FROM `%s`.`%s`", db, table))) { - try (ResultSet resultSet = statement.executeQuery()) { - List> results = new ArrayList<>(); - int numColumns = resultSet.getMetaData().getColumnCount(); - while (resultSet.next()) { - List row = new ArrayList<>(); - for (int i = 1; i <= numColumns; i++) { - row.add(resultSet.getObject(i)); - } - results.add(row); - } - return results; - } - } - } - - private static void verifyResult(List> expected, List> actual) { - List expectedRows = new ArrayList<>(); - List actualRows = new ArrayList<>(); - for (List row : expected) { - StringJoiner joiner = new StringJoiner(","); - for (Object col : row) { - joiner.add(col == null ? "null" : col.toString()); - } - expectedRows.add(joiner.toString()); - } - expectedRows.sort(String::compareTo); - - for (List row : actual) { - StringJoiner joiner = new StringJoiner(","); - for (Object col : row) { - joiner.add(col == null ? "null" : col.toString()); - } - actualRows.add(joiner.toString()); - } - actualRows.sort(String::compareTo); - assertArrayEquals(expectedRows.toArray(), actualRows.toArray()); - } } diff --git a/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTestBase.java b/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTestBase.java new file mode 100644 index 00000000..2a3fe611 --- /dev/null +++ b/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTestBase.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.it.sink; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.util.UUID; + +import static org.junit.Assume.assumeTrue; + +public abstract class StarRocksSinkITTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(StarRocksSinkITTestBase.class); + + private static final boolean DEBUG_MODE = false; + protected static String DB_NAME; + protected static String HTTP_URLS; + protected static String JDBC_URLS; + + protected static String getHttpUrls() { + return HTTP_URLS; + } + + protected static String getJdbcUrl() { + return JDBC_URLS; + } + + protected static Connection DB_CONNECTION; + + @BeforeClass + public static void setUp() throws Exception { + HTTP_URLS = DEBUG_MODE ? "127.0.0.1:11901" : System.getProperty("http_urls"); + JDBC_URLS = DEBUG_MODE ? "jdbc:mysql://127.0.0.1:11903" : System.getProperty("jdbc_urls"); + assumeTrue(HTTP_URLS != null && JDBC_URLS != null); + + DB_NAME = "sr_sink_test_" + genRandomUuid(); + try { + DB_CONNECTION = DriverManager.getConnection(getJdbcUrl(), "root", ""); + LOG.info("Success to create db connection via jdbc {}", getJdbcUrl()); + } catch (Exception e) { + LOG.error("Failed to create db connection via jdbc {}", getJdbcUrl(), e); + throw e; + } + + try { + String createDb = "CREATE DATABASE " + DB_NAME; + executeSRDDLSQL(createDb); + LOG.info("Successful to create database {}", DB_NAME); + } catch (Exception e) { + LOG.error("Failed to create database {}", DB_NAME, e); + throw e; + } + } + + @AfterClass + public static void tearDown() throws Exception { + if (DB_CONNECTION != null) { + try { + String dropDb = String.format("DROP DATABASE IF EXISTS %s FORCE", DB_NAME); + executeSRDDLSQL(dropDb); + LOG.info("Successful to drop database {}", DB_NAME); + } catch (Exception e) { + LOG.error("Failed to drop database {}", DB_NAME, e); + } + DB_CONNECTION.close(); + } + } + + protected static String genRandomUuid() { + return UUID.randomUUID().toString().replace("-", "_"); + } + + protected static void executeSRDDLSQL(String sql) throws Exception { + try (PreparedStatement statement = DB_CONNECTION.prepareStatement(sql)) { + statement.execute(); + } + } +} diff --git a/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksTableUtils.java b/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksTableUtils.java new file mode 100644 index 00000000..1ebd17a7 --- /dev/null +++ b/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksTableUtils.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.it.sink; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.StringJoiner; + +import static org.junit.Assert.assertArrayEquals; + +public class StarRocksTableUtils { + + // Scan table, and returns a collection of rows + public static List> scanTable(Connection dbConnector, String db, String table) throws SQLException { + try (PreparedStatement statement = dbConnector.prepareStatement(String.format("SELECT * FROM `%s`.`%s`", db, table))) { + try (ResultSet resultSet = statement.executeQuery()) { + List> results = new ArrayList<>(); + int numColumns = resultSet.getMetaData().getColumnCount(); + while (resultSet.next()) { + List row = new ArrayList<>(); + for (int i = 1; i <= numColumns; i++) { + row.add(resultSet.getObject(i)); + } + results.add(row); + } + return results; + } + } + } + + public static void verifyResult(List> expected, List> actual) { + List expectedRows = new ArrayList<>(); + List actualRows = new ArrayList<>(); + for (List row : expected) { + StringJoiner joiner = new StringJoiner(","); + for (Object col : row) { + joiner.add(col == null ? "null" : col.toString()); + } + expectedRows.add(joiner.toString()); + } + expectedRows.sort(String::compareTo); + + for (List row : actual) { + StringJoiner joiner = new StringJoiner(","); + for (Object col : row) { + joiner.add(col == null ? "null" : col.toString()); + } + actualRows.add(joiner.toString()); + } + actualRows.sort(String::compareTo); + assertArrayEquals(expectedRows.toArray(), actualRows.toArray()); + } +} diff --git a/src/test/java/com/starrocks/connector/flink/it/sink/kafka/KafkaTableTestBase.java b/src/test/java/com/starrocks/connector/flink/it/sink/kafka/KafkaTableTestBase.java new file mode 100644 index 00000000..2f632d8b --- /dev/null +++ b/src/test/java/com/starrocks/connector/flink/it/sink/kafka/KafkaTableTestBase.java @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.it.sink.kafka; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.admin.TopicListing; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.utility.DockerImageName; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Timer; +import java.util.TimerTask; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.junit.Assume.assumeTrue; + +/** + * Base class for Kafka Table IT Cases. Refer to Flink's KafkaTableTestBase. + */ +public abstract class KafkaTableTestBase extends AbstractTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaTableTestBase.class); + + private static final boolean DEBUG_MODE = false; + + private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka"; + private static final int zkTimeoutMills = 30000; + + protected static String SR_DB_NAME; + protected static String SR_HTTP_URLS; + protected static String SR_JDBC_URLS; + + protected static String getSrHttpUrls() { + return SR_HTTP_URLS; + } + + protected static String getSrJdbcUrl() { + return SR_JDBC_URLS; + } + + protected static Connection SR_DB_CONNECTION; + + @ClassRule + public static final KafkaContainer KAFKA_CONTAINER = + new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.3.0")) { + @Override + protected void doStart() { + super.doStart(); + if (LOG.isInfoEnabled()) { + this.followOutput(new Slf4jLogConsumer(LOG)); + } + } + }.withEmbeddedZookeeper() + .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS) + .withEnv( + "KAFKA_TRANSACTION_MAX_TIMEOUT_MS", + String.valueOf(Duration.ofHours(2).toMillis())) + // Disable log deletion to prevent records from being deleted during test run + .withEnv("KAFKA_LOG_RETENTION_MS", "-1"); + + protected StreamExecutionEnvironment env; + protected StreamTableEnvironment tEnv; + + // Timer for scheduling logging task if the test hangs + private final Timer loggingTimer = new Timer("Debug Logging Timer"); + + @BeforeClass + public static void setUp() throws Exception { + SR_HTTP_URLS = DEBUG_MODE ? "127.0.0.1:11901" : System.getProperty("http_urls"); + SR_JDBC_URLS = DEBUG_MODE ? "jdbc:mysql://127.0.0.1:11903" : System.getProperty("jdbc_urls"); + assumeTrue(SR_HTTP_URLS != null && SR_JDBC_URLS != null); + + SR_DB_NAME = "sr_sink_test_" + genRandomUuid(); + try { + SR_DB_CONNECTION = DriverManager.getConnection(getSrJdbcUrl(), "root", ""); + LOG.info("Success to create db connection via jdbc {}", getSrJdbcUrl()); + } catch (Exception e) { + LOG.error("Failed to create db connection via jdbc {}", getSrJdbcUrl(), e); + throw e; + } + + try { + String createDb = "CREATE DATABASE " + SR_DB_NAME; + executeSrSQL(createDb); + LOG.info("Successful to create database {}", SR_DB_NAME); + } catch (Exception e) { + LOG.error("Failed to create database {}", SR_DB_NAME, e); + throw e; + } + } + + @AfterClass + public static void tearDown() throws Exception { + if (SR_DB_CONNECTION != null) { + try { + String dropDb = String.format("DROP DATABASE IF EXISTS %s FORCE", SR_DB_NAME); + executeSrSQL(dropDb); + LOG.info("Successful to drop database {}", SR_DB_NAME); + } catch (Exception e) { + LOG.error("Failed to drop database {}", SR_DB_NAME, e); + } + SR_DB_CONNECTION.close(); + } + } + + @Before + public void setup() { + env = StreamExecutionEnvironment.getExecutionEnvironment(); + tEnv = StreamTableEnvironment.create(env); + env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + + // Probe Kafka broker status per 30 seconds + scheduleTimeoutLogger( + Duration.ofSeconds(30), + () -> { + // List all non-internal topics + final Map topicDescriptions = + describeExternalTopics(); + LOG.info("Current existing topics: {}", topicDescriptions.keySet()); + + // Log status of topics + logTopicPartitionStatus(topicDescriptions); + }); + } + + @After + public void after() { + // Cancel timer for debug logging + cancelTimeoutLogger(); + } + + public Properties getStandardProps() { + Properties standardProps = new Properties(); + standardProps.put("bootstrap.servers", KAFKA_CONTAINER.getBootstrapServers()); + standardProps.put("group.id", "flink-tests"); + standardProps.put("enable.auto.commit", false); + standardProps.put("auto.offset.reset", "earliest"); + standardProps.put("max.partition.fetch.bytes", 256); + standardProps.put("zookeeper.session.timeout.ms", zkTimeoutMills); + standardProps.put("zookeeper.connection.timeout.ms", zkTimeoutMills); + return standardProps; + } + + public String getBootstrapServers() { + return KAFKA_CONTAINER.getBootstrapServers(); + } + + public void createTestTopic(String topic, int numPartitions, int replicationFactor) { + Map properties = new HashMap<>(); + properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers()); + try (AdminClient admin = AdminClient.create(properties)) { + admin.createTopics( + Collections.singletonList( + new NewTopic(topic, numPartitions, (short) replicationFactor))); + } + } + + public Map getConsumerOffset(String groupId) { + Map properties = new HashMap<>(); + properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers()); + try (AdminClient admin = AdminClient.create(properties)) { + ListConsumerGroupOffsetsResult result = admin.listConsumerGroupOffsets(groupId); + return result.partitionsToOffsetAndMetadata().get(20, TimeUnit.SECONDS); + } catch (Exception e) { + throw new IllegalStateException( + String.format("Fail to get consumer offsets with the group id [%s].", groupId), + e); + } + } + + public void deleteTestTopic(String topic) { + Map properties = new HashMap<>(); + properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers()); + try (AdminClient admin = AdminClient.create(properties)) { + admin.deleteTopics(Collections.singletonList(topic)); + } + } + + // ------------------------ For Debug Logging Purpose ---------------------------------- + + private void scheduleTimeoutLogger(Duration period, Runnable loggingAction) { + TimerTask timeoutLoggerTask = + new TimerTask() { + @Override + public void run() { + try { + loggingAction.run(); + } catch (Exception e) { + throw new RuntimeException("Failed to execute logging action", e); + } + } + }; + loggingTimer.schedule(timeoutLoggerTask, 0L, period.toMillis()); + } + + private void cancelTimeoutLogger() { + loggingTimer.cancel(); + } + + private Map describeExternalTopics() { + try (final AdminClient adminClient = AdminClient.create(getStandardProps())) { + final List topics = + adminClient.listTopics().listings().get().stream() + .filter(listing -> !listing.isInternal()) + .map(TopicListing::name) + .collect(Collectors.toList()); + + return adminClient.describeTopics(topics).all().get(); + } catch (Exception e) { + throw new RuntimeException("Failed to list Kafka topics", e); + } + } + + private void logTopicPartitionStatus(Map topicDescriptions) { + final Properties properties = getStandardProps(); + properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-tests-debugging"); + properties.setProperty( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + StringDeserializer.class.getCanonicalName()); + properties.setProperty( + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + StringDeserializer.class.getCanonicalName()); + final KafkaConsumer consumer = new KafkaConsumer(properties); + List partitions = new ArrayList<>(); + topicDescriptions.forEach( + (topic, description) -> + description + .partitions() + .forEach( + tpInfo -> + partitions.add( + new TopicPartition( + topic, tpInfo.partition())))); + final Map beginningOffsets = consumer.beginningOffsets(partitions); + final Map endOffsets = consumer.endOffsets(partitions); + partitions.forEach( + partition -> + LOG.info( + "TopicPartition \"{}\": starting offset: {}, stopping offset: {}", + partition, + beginningOffsets.get(partition), + endOffsets.get(partition))); + } + + + protected static String genRandomUuid() { + return UUID.randomUUID().toString().replace("-", "_"); + } + + protected static void executeSrSQL(String sql) throws Exception { + try (PreparedStatement statement = SR_DB_CONNECTION.prepareStatement(sql)) { + statement.execute(); + } + } +} diff --git a/src/test/java/com/starrocks/connector/flink/it/sink/kafka/KafkaTableTestUtils.java b/src/test/java/com/starrocks/connector/flink/it/sink/kafka/KafkaTableTestUtils.java new file mode 100644 index 00000000..a9ac0d73 --- /dev/null +++ b/src/test/java/com/starrocks/connector/flink/it/sink/kafka/KafkaTableTestUtils.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.it.sink.kafka; + +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.table.utils.TableTestMatchers; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.CloseableIterator; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +/** + * Utils for kafka table tests. Refer to Flink's KafkaTableTestUtils. + */ +public class KafkaTableTestUtils { + public static List collectRows(Table table, int expectedSize) throws Exception { + final TableResult result = table.execute(); + final List collectedRows = new ArrayList<>(); + try (CloseableIterator iterator = result.collect()) { + while (collectedRows.size() < expectedSize && iterator.hasNext()) { + collectedRows.add(iterator.next()); + } + } + result.getJobClient() + .ifPresent( + jc -> { + try { + jc.cancel().get(5, TimeUnit.SECONDS); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + return collectedRows; + } + + public static List readLines(String resource) throws IOException { + final URL url = KafkaToStarRocksITTest.class.getClassLoader().getResource(resource); + assert url != null; + Path path = new File(url.getFile()).toPath(); + return Files.readAllLines(path); + } + + public static void waitingExpectedResults( + String sinkName, List expected, Duration timeout) + throws InterruptedException, TimeoutException { + Collections.sort(expected); + CommonTestUtils.waitUtil( + () -> { + List actual = TestValuesTableFactory.getResults(sinkName); + Collections.sort(actual); + return expected.equals(actual); + }, + timeout, + "Can not get the expected result."); + } + + public static void comparedWithKeyAndOrder( + Map> expectedData, List actual, int[] keyLoc) { + Map> actualData = new HashMap<>(); + for (Row row : actual) { + Row key = Row.project(row, keyLoc); + // ignore row kind + key.setKind(RowKind.INSERT); + actualData.computeIfAbsent(key, k -> new LinkedList<>()).add(row); + } + // compare key first + assertEquals("Actual result: " + actual, expectedData.size(), actualData.size()); + // compare by value + for (Row key : expectedData.keySet()) { + assertThat( + actualData.get(key), + TableTestMatchers.deepEqualTo(expectedData.get(key), false)); + } + } +} + diff --git a/src/test/java/com/starrocks/connector/flink/it/sink/kafka/KafkaToStarRocksITTest.java b/src/test/java/com/starrocks/connector/flink/it/sink/kafka/KafkaToStarRocksITTest.java new file mode 100644 index 00000000..78337700 --- /dev/null +++ b/src/test/java/com/starrocks/connector/flink/it/sink/kafka/KafkaToStarRocksITTest.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.it.sink.kafka; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; +import org.apache.flink.connector.kafka.sink.KafkaSink; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.table.api.TableResult; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import static com.starrocks.connector.flink.it.sink.StarRocksTableUtils.scanTable; +import static com.starrocks.connector.flink.it.sink.StarRocksTableUtils.verifyResult; +import static com.starrocks.connector.flink.it.sink.kafka.KafkaTableTestUtils.readLines; + +// Tests for the pipeline from kafka to starrocks. Refer to Flink's KafkaChangelogTableITCase. +public class KafkaToStarRocksITTest extends KafkaTableTestBase { + + @Before + public void before() { + // we have to use single parallelism, + // because we will count the messages in sink to terminate the job + env.setParallelism(1); + } + + @Test + public void testUpsertAndDelete() throws Exception { + // create kafka topic and write data + final String topic = "upsert_and_delete_topic"; + createTestTopic(topic, 1, 1); + List lines = readLines("data/debezium-upsert-and-delete.txt"); + try { + writeRecordsToKafka(topic, lines); + } catch (Exception e) { + throw new Exception("Failed to write debezium data to Kafka.", e); + } + + // create SR table + String tableName = "testUpsertAndDelete_" + genRandomUuid(); + String createStarRocksTable = + String.format( + "CREATE TABLE `%s`.`%s` (" + + "c0 INT," + + "c1 FLOAT," + + "c2 STRING" + + ") ENGINE = OLAP " + + "PRIMARY KEY(c0) " + + "DISTRIBUTED BY HASH (c0) BUCKETS 8 " + + "PROPERTIES (" + + "\"replication_num\" = \"1\"" + + ")", + SR_DB_NAME, tableName); + executeSrSQL(createStarRocksTable); + + // ---------- Produce an event time stream into Kafka ------------------- + String bootstraps = getBootstrapServers(); + String sourceDDL = + String.format( + "CREATE TABLE debezium_source (" + + " c0 INT NOT NULL," + + " c1 FLOAT," + + " c2 STRING," + + " PRIMARY KEY (c0) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'kafka'," + + " 'topic' = '%s'," + + " 'properties.bootstrap.servers' = '%s'," + + " 'scan.startup.mode' = 'earliest-offset'," + + " 'value.format' = 'debezium-json'" + + ")", + topic, bootstraps); + String sinkDDL = + String.format( + "CREATE TABLE sink(" + + "c0 INT," + + "c1 FLOAT," + + "c2 STRING," + + "PRIMARY KEY (`c0`) NOT ENFORCED" + + ") WITH ( " + + "'connector' = 'starrocks'," + + "'jdbc-url'='%s'," + + "'load-url'='%s'," + + "'database-name' = '%s'," + + "'table-name' = '%s'," + + "'username' = '%s'," + + "'password' = '%s'," + + "'sink.buffer-flush.interval-ms' = '1000'" + + ")", + getSrJdbcUrl(), getSrHttpUrls(), SR_DB_NAME, tableName, "root", ""); + + tEnv.executeSql(sourceDDL); + tEnv.executeSql(sinkDDL); + TableResult tableResult = tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source"); + // TODO find an elegant way to wait for the result + try { + Thread.sleep(10000); + } catch (Exception e) { + // ignore + } + tableResult.getJobClient().get().cancel().get(); // stop the job + + List> expectedData = Arrays.asList( + Arrays.asList(1, 2.0f, "row1"), + Arrays.asList(2, 2.0f, "row2") + ); + List> actualData = scanTable(SR_DB_CONNECTION, SR_DB_NAME, tableName); + verifyResult(expectedData, actualData); + + deleteTestTopic(topic); + } + + private void writeRecordsToKafka(String topic, List lines) throws Exception { + DataStreamSource stream = env.fromCollection(lines); + SerializationSchema serSchema = new SimpleStringSchema(); + FlinkKafkaPartitioner partitioner = new FlinkFixedPartitioner<>(); + + // the producer must not produce duplicates + Properties producerProperties = getStandardProps(); + producerProperties.setProperty("retries", "0"); + stream.sinkTo( + KafkaSink.builder() + .setBootstrapServers( + producerProperties.getProperty( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) + .setRecordSerializer( + KafkaRecordSerializationSchema.builder() + .setTopic(topic) + .setValueSerializationSchema(serSchema) + .setPartitioner(partitioner) + .build()) + .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE) + .build()); + env.execute("Write sequence"); + } + +} diff --git a/src/test/resources/data/debezium-upsert-and-delete.txt b/src/test/resources/data/debezium-upsert-and-delete.txt new file mode 100644 index 00000000..9de9d52f --- /dev/null +++ b/src/test/resources/data/debezium-upsert-and-delete.txt @@ -0,0 +1,5 @@ +{"op":"c","ts_ms":1589355606100,"before":null,"after":{"c0":1,"c1":1.0,"c2":"row1"},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"transaction":null} +{"op":"c","ts_ms":1589355606200,"before":null,"after":{"c0":2,"c1":2.0,"c2":"row2"},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"transaction":null} +{"op":"c","ts_ms":1589355606300,"before":null,"after":{"c0":3,"c1":3.0,"c2":"row3"},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"transaction":null} +{"op":"u","ts_ms":1589355606400,"before":{"c0":1,"c1":1.0,"c2":"row1"},"after":{"c0":1,"c1":2.0,"c2":"row1"},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"transaction":null} +{"op":"d","ts_ms":1589355606500,"before":{"c0":3,"c1":3.0,"c2":"row3"},"after":null,"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"transaction":null} diff --git a/src/test/resources/log4j.properties b/src/test/resources/log4j.properties new file mode 100644 index 00000000..b4b0660e --- /dev/null +++ b/src/test/resources/log4j.properties @@ -0,0 +1,30 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +log4j.rootLogger=OFF, testlogger + +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +# suppress the irrelevant (wrong) warnings from the netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger +log4j.logger.org.apache.zookeeper=OFF, testlogger +log4j.logger.state.change.logger=OFF, testlogger +log4j.logger.kafka=OFF, testlogger \ No newline at end of file