Skip to content

Commit

Permalink
[UT] Add IT framework for pipeline from kakfka to starrocks
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy committed Jul 5, 2023
1 parent f0d9a05 commit 6d595c8
Show file tree
Hide file tree
Showing 9 changed files with 949 additions and 141 deletions.
173 changes: 156 additions & 17 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ limitations under the License.
<maven-surefire-plugin.version>3.0.0-M4</maven-surefire-plugin.version>
<flink.version>1.15.0</flink.version>
<arrow.version>5.0.0</arrow.version>
<kafka.version>2.8.1</kafka.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>

<dependencies>
Expand All @@ -66,18 +68,17 @@ limitations under the License.
<classifier>jar-with-dependencies</classifier>
</dependency>

<!-- Table ecosystem -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-runtime -->

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>1.15.0</version>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -153,25 +154,162 @@ limitations under the License.
<artifactId>arrow-memory-netty</artifactId>
<version>${arrow.version}</version>
</dependency>

<!--Following dependencies are mainly used for tests of kafka -> starrocks pipelines, and copied from flink-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>

<dependency>
<!-- include 2.0 server for tests -->
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<artifactId>flink-runtime</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>flink-clients</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>curator-test</artifactId>
<groupId>org.apache.curator</groupId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.9</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.18.3</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>1.18.3</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-tests</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-jmx</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

<!-- Kafka table factory testing -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<!-- Kafka SQL IT test with formats -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro-confluent-registry</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

<dependency>
Expand All @@ -187,6 +325,7 @@ limitations under the License.
<scope>test</scope>
</dependency>
</dependencies>

<build>
<resources>
<resource>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,92 +30,16 @@
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.StringJoiner;
import java.util.UUID;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assume.assumeTrue;

public class StarRocksSinkITTest {

private static final Logger LOG = LoggerFactory.getLogger(StarRocksSinkITTest.class);

private static String DB_NAME;
private static String HTTP_URLS = null; // "127.0.0.1:11901";
private static String JDBC_URLS = null; // "jdbc:mysql://127.0.0.1:11903";

private static String getHttpUrls() {
return HTTP_URLS;
}
import static com.starrocks.connector.flink.it.sink.StarRocksTableUtils.scanTable;
import static com.starrocks.connector.flink.it.sink.StarRocksTableUtils.verifyResult;

private static String getJdbcUrl() {
return JDBC_URLS;
}

private static Connection DB_CONNECTION;

@BeforeClass
public static void setUp() throws Exception {
HTTP_URLS = System.getProperty("http_urls");
JDBC_URLS = System.getProperty("jdbc_urls");
assumeTrue(HTTP_URLS != null && JDBC_URLS != null);

DB_NAME = "sr_sink_test_" + genRandomUuid();
try {
DB_CONNECTION = DriverManager.getConnection(getJdbcUrl(), "root", "");
LOG.info("Success to create db connection via jdbc {}", getJdbcUrl());
} catch (Exception e) {
LOG.error("Failed to create db connection via jdbc {}", getJdbcUrl(), e);
throw e;
}

try {
String createDb = "CREATE DATABASE " + DB_NAME;
executeSRDDLSQL(createDb);
LOG.info("Successful to create database {}", DB_NAME);
} catch (Exception e) {
LOG.error("Failed to create database {}", DB_NAME, e);
throw e;
}
}

@AfterClass
public static void tearDown() throws Exception {
if (DB_CONNECTION != null) {
try {
String dropDb = String.format("DROP DATABASE IF EXISTS %s FORCE", DB_NAME);
executeSRDDLSQL(dropDb);
LOG.info("Successful to drop database {}", DB_NAME);
} catch (Exception e) {
LOG.error("Failed to drop database {}", DB_NAME, e);
}
DB_CONNECTION.close();
}
}

private static String genRandomUuid() {
return UUID.randomUUID().toString().replace("-", "_");
}

private static void executeSRDDLSQL(String sql) throws Exception {
try (PreparedStatement statement = DB_CONNECTION.prepareStatement(sql)) {
statement.execute();
}
}
public class StarRocksSinkITTest extends StarRocksSinkITTestBase {

@Test
public void testDupKeyWriteFullColumnsInOrder() throws Exception {
Expand Down Expand Up @@ -234,7 +158,7 @@ private void testDupKeyWriteBase(String tableName, String flinkDDL, RowTypeInfo
Table in = tEnv.fromDataStream(srcDs);
tEnv.createTemporaryView("src", in);
tEnv.executeSql("INSERT INTO sink SELECT * FROM src").await();
List<List<Object>> actualData = scanTable(DB_NAME, tableName);
List<List<Object>> actualData = scanTable(DB_CONNECTION, DB_NAME, tableName);
verifyResult(expectedData, actualData);
}

Expand Down Expand Up @@ -356,7 +280,7 @@ private void testPkWriteForBase(String tableName, String flinkDDL, RowTypeInfo
Table in = tEnv.fromDataStream(srcDs);
tEnv.createTemporaryView("src", in);
tEnv.executeSql("INSERT INTO sink SELECT * FROM src").await();
List<List<Object>> actualData = scanTable(DB_NAME, tableName);
List<List<Object>> actualData = scanTable(DB_CONNECTION, DB_NAME, tableName);
verifyResult(expectedData, actualData);
}

Expand Down Expand Up @@ -422,48 +346,7 @@ public void testPKUpsertAndDelete() throws Exception {
Arrays.asList(1, 3.0f, "row3"),
Arrays.asList(3, 3.0f, "row3")
);
List<List<Object>> actualData = scanTable(DB_NAME, tableName);
List<List<Object>> actualData = scanTable(DB_CONNECTION, DB_NAME, tableName);
verifyResult(expectedData, actualData);
}

// Scan table, and returns a collection of rows
private static List<List<Object>> scanTable(String db, String table) throws SQLException {
try (PreparedStatement statement = DB_CONNECTION.prepareStatement(String.format("SELECT * FROM `%s`.`%s`", db, table))) {
try (ResultSet resultSet = statement.executeQuery()) {
List<List<Object>> results = new ArrayList<>();
int numColumns = resultSet.getMetaData().getColumnCount();
while (resultSet.next()) {
List<Object> row = new ArrayList<>();
for (int i = 1; i <= numColumns; i++) {
row.add(resultSet.getObject(i));
}
results.add(row);
}
return results;
}
}
}

private static void verifyResult(List<List<Object>> expected, List<List<Object>> actual) {
List<String> expectedRows = new ArrayList<>();
List<String> actualRows = new ArrayList<>();
for (List<Object> row : expected) {
StringJoiner joiner = new StringJoiner(",");
for (Object col : row) {
joiner.add(col == null ? "null" : col.toString());
}
expectedRows.add(joiner.toString());
}
expectedRows.sort(String::compareTo);

for (List<Object> row : actual) {
StringJoiner joiner = new StringJoiner(",");
for (Object col : row) {
joiner.add(col == null ? "null" : col.toString());
}
actualRows.add(joiner.toString());
}
actualRows.sort(String::compareTo);
assertArrayEquals(expectedRows.toArray(), actualRows.toArray());
}
}
Loading

0 comments on commit 6d595c8

Please sign in to comment.