diff --git a/src/main/java/com/starrocks/connector/flink/catalog/CatalogOptions.java b/src/main/java/com/starrocks/connector/flink/catalog/CatalogOptions.java new file mode 100644 index 00000000..38a804a3 --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/catalog/CatalogOptions.java @@ -0,0 +1,70 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.catalog; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.table.catalog.CommonCatalogOptions; + +public class CatalogOptions { + + public static final String IDENTIFIER = "starrocks"; + + public static ConfigOption FE_JDBC_URL = + ConfigOptions.key("jdbc-url") + .stringType() + .noDefaultValue() + .withDescription("StarRocks JDBC url like: `jdbc:mysql://fe_ip1:query_port,fe_ip2:query_port...`."); + + public static ConfigOption FE_HTTP_URL = + ConfigOptions.key("http-url") + .stringType() + .noDefaultValue() + .withDescription("StarRocks FE http url like: `fe_ip1:http_port,http://fe_ip2:http_port`."); + + public static final ConfigOption USERNAME = + ConfigOptions.key("username") + .stringType() + .noDefaultValue() + .withDescription("StarRocks user name."); + + public static final ConfigOption PASSWORD = + ConfigOptions.key("password") + .stringType() + .noDefaultValue() + .withDescription("StarRocks user password."); + + public static final ConfigOption DEFAULT_DATABASE = + ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY) + .stringType() + .noDefaultValue() + .withDescription("The default database."); + + // ------ options for create table ------ + + public static final String TABLE_PROPERTIES_PREFIX = "table.properties."; + + public static final ConfigOption TABLE_NUM_BUCKETS = + ConfigOptions.key("table.num-buckets") + .intType() + .noDefaultValue() + .withDescription("Number of buckets for creating StarRocks table."); +} diff --git a/src/main/java/com/starrocks/connector/flink/catalog/ConfigUtils.java b/src/main/java/com/starrocks/connector/flink/catalog/ConfigUtils.java new file mode 100644 index 00000000..5139f54b --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/catalog/ConfigUtils.java @@ -0,0 +1,43 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.catalog; + +import java.util.HashMap; +import java.util.Map; + +public class ConfigUtils { + + public static Map getPrefixConfigs( + String prefix, Map conf, boolean removePrefix) { + Map result = new HashMap<>(); + for (Map.Entry entry : conf.entrySet()) { + String key = entry.getKey(); + if (key.startsWith(prefix)) { + if (removePrefix) { + result.put(key.substring(prefix.length()), entry.getValue()); + } else { + result.put(key, entry.getValue()); + } + } + } + return result; + } +} diff --git a/src/main/java/com/starrocks/connector/flink/catalog/JdbcUtils.java b/src/main/java/com/starrocks/connector/flink/catalog/JdbcUtils.java new file mode 100644 index 00000000..4287355e --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/catalog/JdbcUtils.java @@ -0,0 +1,55 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.catalog; + +import org.apache.flink.table.api.ValidationException; + +public class JdbcUtils { + + // Driver name for mysql connector 5.1 which is deprecated in 8.0 + private static final String MYSQL_51_DRIVER_NAME = "com.mysql.jdbc.Driver"; + + // Driver name for mysql connector 8.0 + private static final String MYSQL_80_DRIVER_NAME = "com.mysql.cj.jdbc.Driver"; + + private static final String MYSQL_SITE_URL = "https://dev.mysql.com/downloads/connector/j/"; + private static final String MAVEN_CENTRAL_URL = "https://repo1.maven.org/maven2/mysql/mysql-connector-java/"; + + public static String getJdbcUrl(String host, int port) { + return String.format("jdbc:mysql://%s:%d", host, port); + } + + public static void verifyJdbcDriver() { + try { + Class.forName(MYSQL_80_DRIVER_NAME); + } catch (ClassNotFoundException e) { + try { + Class.forName(MYSQL_51_DRIVER_NAME); + } catch (ClassNotFoundException ie) { + String msg = String.format("Can't find mysql jdbc driver, please download it and " + + "put it in your classpath manually. You can download it from MySQL " + + "site %s, or Maven Central %s", + MYSQL_SITE_URL, MAVEN_CENTRAL_URL); + throw new ValidationException(msg, ie); + } + } + } +} diff --git a/src/main/java/com/starrocks/connector/flink/catalog/StarRocksCatalog.java b/src/main/java/com/starrocks/connector/flink/catalog/StarRocksCatalog.java new file mode 100644 index 00000000..ccb5db3b --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/catalog/StarRocksCatalog.java @@ -0,0 +1,618 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.catalog; + +import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions; +import com.starrocks.connector.flink.table.source.StarRocksSourceOptions; +import org.apache.arrow.util.VisibleForTesting; +import org.apache.commons.compress.utils.Lists; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.exceptions.TablePartitionedException; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.factories.Factory; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; +import org.apache.flink.util.TemporaryClassLoaderContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static com.starrocks.connector.flink.catalog.JdbcUtils.verifyJdbcDriver; +import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; + +/** Flink catalog for StarRocks. */ +public class StarRocksCatalog extends AbstractCatalog { + + private static final Logger LOG = LoggerFactory.getLogger(StarRocksCatalog.class); + + private final String jdbcUrl; + private final String httpUrl; + private final String username; + private final String password; + private final Configuration sourceBaseConfig; + private final Configuration sinkBaseConfig; + private final Configuration tableBaseConfig; + private final ClassLoader userClassLoader; + + public StarRocksCatalog( + String name, + String jdbcUrl, + String httpUrl, + String username, + String password, + String defaultDatabase, + Configuration sourceBaseConfig, + Configuration sinkBaseConfig, + Configuration tableBaseConfig, + ClassLoader userClassLoader) { + super(name, defaultDatabase); + this.jdbcUrl = jdbcUrl; + this.httpUrl = httpUrl; + this.username = username; + this.password = password; + this.sourceBaseConfig = sourceBaseConfig; + this.sinkBaseConfig = sinkBaseConfig; + this.tableBaseConfig = tableBaseConfig; + this.userClassLoader = userClassLoader; + } + + @Override + public Optional getFactory() { + return Optional.of(new StarRocksDynamicTableFactory()); + } + + @Override + public void open() throws CatalogException { + // Refer to flink-connector-jdbc's AbstractJdbcCatalog + // load the Driver use userClassLoader explicitly, see FLINK-15635 for more detail + try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(userClassLoader)) { + verifyJdbcDriver(); + // test connection, fail early if we cannot connect to database + try (Connection conn = getConnection()) { + } catch (SQLException e) { + throw new ValidationException( + String.format("Failed to connect StarRocks via JDBC: %s.", jdbcUrl), e); + } + } + LOG.info("Open StarRocks catalog {}", getName()); + } + + @Override + public void close() throws CatalogException { + LOG.info("Close StarRocks catalog {}", getName()); + } + + // ------ databases ------ + + @Override + public List listDatabases() throws CatalogException { + return executeSingleColumnStatement( + "SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA`;"); + } + + @Override + public CatalogDatabase getDatabase(String databaseName) + throws DatabaseNotExistException, CatalogException { + Preconditions.checkArgument( + !StringUtils.isNullOrWhitespaceOnly(databaseName), + "database name cannot be null or empty."); + if (databaseExists(databaseName)) { + return new CatalogDatabaseImpl(Collections.emptyMap(), null); + } else { + throw new DatabaseNotExistException(getName(), databaseName); + } + } + + @Override + public boolean databaseExists(String databaseName) throws CatalogException { + Preconditions.checkArgument( + !StringUtils.isNullOrWhitespaceOnly(databaseName), + "database name cannot be null or empty."); + List dbList = executeSingleColumnStatement( + "SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA` WHERE SCHEMA_NAME = ?;", + databaseName); + return !dbList.isEmpty(); + } + + @Override + public void createDatabase(String databaseName, CatalogDatabase database, boolean ignoreIfExists) + throws DatabaseAlreadyExistException, CatalogException { + Preconditions.checkArgument( + !StringUtils.isNullOrWhitespaceOnly(databaseName), + "database name cannot be null or empty."); + if (databaseExists(databaseName)) { + if (ignoreIfExists) { + return; + } + throw new DatabaseAlreadyExistException(getName(), databaseName); + } + + try { + String sql = String.format("CREATE DATABASE %s%s;", + ignoreIfExists ? "IF NOT EXISTS " : "", databaseName); + executeUpdateStatement(sql); + } catch (SQLException e) { + throw new CatalogException( + String.format("Failed to create database %s, ignoreIfExists: %s", + databaseName, ignoreIfExists), + e); + } + } + + @Override + public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) + throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) + throws DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + // ------ tables ------ + + @Override + public List listTables(String databaseName) throws DatabaseNotExistException, CatalogException { + Preconditions.checkArgument( + !StringUtils.isNullOrWhitespaceOnly(databaseName), + "database name cannot be null or empty."); + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(getName(), databaseName); + } + + return executeSingleColumnStatement( + "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = ?", + databaseName); + } + + @Override + public boolean tableExists(ObjectPath tablePath) throws CatalogException { + List tableList = executeSingleColumnStatement( + "SELECT TABLE_NAME FROM information_schema.`TABLES` " + + "WHERE TABLE_SCHEMA=? and TABLE_NAME=?", + tablePath.getDatabaseName(), + tablePath.getObjectName() + ); + return !tableList.isEmpty(); + } + + @Override + public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException { + if (!tableExists(tablePath)) { + throw new TableNotExistException(getName(), tablePath); + } + + try (Connection connection = getConnection()) { + StarRocksTable starRocksTable = StarRocksUtils.getStarRocksTable( + connection, tablePath.getDatabaseName(), tablePath.getObjectName()); + + StarRocksSchema starRocksSchema = starRocksTable.getSchema(); + Schema.Builder flinkSchemaBuilder = Schema.newBuilder(); + Set primaryKeys = new HashSet<>(); + if (starRocksTable.getTableType() == StarRocksTable.TableType.PRIMARY && + starRocksTable.getTableKeys() != null) { + flinkSchemaBuilder.primaryKey(starRocksTable.getTableKeys()); + primaryKeys.addAll(starRocksTable.getTableKeys()); + } + for (StarRocksColumn column : starRocksSchema.getColumns()) { + flinkSchemaBuilder.column( + column.getName(), + TypeUtils.toFlinkType( + column.getType(), + column.getSize(), + column.getScale(), + !primaryKeys.contains(column.getName()) + ) + ); + } + + Schema flinkSchema = flinkSchemaBuilder.build(); + + Map properties = new HashMap<>(); + properties.put(CONNECTOR.key(), CatalogOptions.IDENTIFIER); + properties.putAll(getSourceConfig(tablePath.getDatabaseName(), tablePath.getObjectName())); + properties.putAll(getSinkConfig(tablePath.getDatabaseName(), tablePath.getObjectName())); + + return CatalogTable.of(flinkSchema, starRocksTable.getComment(), Lists.newArrayList(), properties); + } catch (Exception e) { + throw new CatalogException( + String.format("Failed to get table %s in catalog %s", tablePath.getFullName(), getName()), e); + } + } + + @Override + public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + if (!databaseExists(tablePath.getDatabaseName())) { + throw new DatabaseNotExistException(getName(), tablePath.getDatabaseName()); + } + + if (tableExists(tablePath)) { + if (ignoreIfExists) { + return; + } + throw new TableAlreadyExistException(getName(), tablePath); + } + + StarRocksTable starRocksTable = StarRocksUtils.toStarRocksTable(getName(), tablePath, tableBaseConfig, table); + String createTableSql = StarRocksUtils.buildCreateTableSql(starRocksTable, ignoreIfExists); + try { + executeUpdateStatement(createTableSql); + LOG.info("Success to create table {} in catalog {}", tablePath.getFullName(), getName()); + LOG.info("The create table DDL:\n{}", createTableSql); + } catch (Exception e) { + LOG.error("Failed to create table {} in catalog {}", tablePath.getFullName(), getName(), e); + LOG.error("The failed create table DDL:\n{}", createTableSql); + throw new CatalogException( + String.format("Failed to create table %s in catalog %s", + tablePath.getFullName(), getName()), e); + } + } + + @Override + public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + if (!tableExists(tablePath)) { + if (ignoreIfNotExists) { + return; + } + throw new TableNotExistException(getName(), tablePath); + } + + try { + String dropSql = String.format( + "DROP TABLE `%s`.`%s`;", tablePath.getDatabaseName(), tablePath.getObjectName()); + executeUpdateStatement(dropSql); + } catch (Exception e) { + throw new CatalogException(String.format("Failed to drop table %s in catalog %s", + tablePath.getFullName(), getName()), e); + } + } + + @Override + public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + // TODO Flink supports to add/drop column since 1.17. Implement it in the future if needed. + throw new UnsupportedOperationException(); + } + + @Override + public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) + throws TableNotExistException, TableAlreadyExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + private Map getSourceConfig(String database, String table) { + Map sourceConfig = new HashMap<>(sourceBaseConfig.toMap()); + setIfNotExist(sourceConfig, StarRocksSourceOptions.JDBC_URL, jdbcUrl); + setIfNotExist(sourceConfig, StarRocksSourceOptions.SCAN_URL, httpUrl); + setIfNotExist(sourceConfig, StarRocksSourceOptions.USERNAME, username); + setIfNotExist(sourceConfig, StarRocksSourceOptions.PASSWORD, password); + sourceConfig.put(StarRocksSourceOptions.DATABASE_NAME.key(), database); + sourceConfig.put(StarRocksSourceOptions.TABLE_NAME.key(), table); + return sourceConfig; + } + + private Map getSinkConfig(String database, String table) { + Map sinkConfig = new HashMap<>(sinkBaseConfig.toMap()); + setIfNotExist(sinkConfig, StarRocksSinkOptions.JDBC_URL, jdbcUrl); + setIfNotExist(sinkConfig, StarRocksSinkOptions.LOAD_URL, httpUrl); + setIfNotExist(sinkConfig, StarRocksSinkOptions.USERNAME, username); + setIfNotExist(sinkConfig, StarRocksSinkOptions.PASSWORD, password); + sinkConfig.put(StarRocksSinkOptions.DATABASE_NAME.key(), database); + sinkConfig.put(StarRocksSinkOptions.TABLE_NAME.key(), table); + if (sinkConfig.containsKey(StarRocksSinkOptions.SINK_LABEL_PREFIX.key())) { + String rawLabelPrefix = sinkConfig.get(StarRocksSinkOptions.SINK_LABEL_PREFIX.key()); + String labelPrefix = String.join("_", rawLabelPrefix, database, table); + sinkConfig.put(StarRocksSinkOptions.SINK_LABEL_PREFIX.key(), labelPrefix); + } + return sinkConfig; + } + + private void setIfNotExist(Map config, ConfigOption option, String value) { + if (!config.containsKey(option.key())) { + config.put(option.key(), value); + } + } + + private int executeUpdateStatement(String sql) throws SQLException { + try (Connection connection = getConnection(); + Statement statement = connection.createStatement()) { + return statement.executeUpdate(sql); + } + } + + private List executeSingleColumnStatement(String sql, Object... params) { + try (Connection conn = getConnection(); + PreparedStatement statement = conn.prepareStatement(sql)) { + List columnValues = Lists.newArrayList(); + if (params != null) { + for (int i = 0; i < params.length; i++) { + statement.setObject(i + 1, params[i]); + } + } + try (ResultSet rs = statement.executeQuery()) { + while (rs.next()) { + String columnValue = rs.getString(1); + columnValues.add(columnValue); + } + } + return columnValues; + } catch (Exception e) { + throw new CatalogException( + String.format("Failed to execute sql: %s", sql), e); + } + } + + private Connection getConnection() throws SQLException { + // TODO reuse the connection + return DriverManager.getConnection(jdbcUrl, username, password); + } + + // ------ views ------ + + @Override + public List listViews(String databaseName) throws DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + // ------ partitions ------ + + @Override + public List listPartitions(ObjectPath tablePath) + throws TableNotExistException, TableNotPartitionedException, CatalogException { + return Collections.emptyList(); + } + + @Override + public List listPartitions( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws TableNotExistException, TableNotPartitionedException, + PartitionSpecInvalidException, CatalogException { + return Collections.emptyList(); + } + + @Override + public List listPartitionsByFilter( + ObjectPath tablePath, List filters) + throws TableNotExistException, TableNotPartitionedException, CatalogException { + return Collections.emptyList(); + } + + @Override + public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws PartitionNotExistException, CatalogException { + throw new PartitionNotExistException(getName(), tablePath, partitionSpec); + } + + @Override + public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws CatalogException { + return false; + } + + @Override + public void createPartition( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogPartition partition, + boolean ignoreIfExists) + throws TableNotExistException, TableNotPartitionedException, + PartitionSpecInvalidException, PartitionAlreadyExistsException, + CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void dropPartition( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterPartition( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogPartition newPartition, + boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + // ------ functions ------ + + @Override + public List listFunctions(String dbName) throws DatabaseNotExistException, CatalogException { + return Collections.emptyList(); + } + + @Override + public CatalogFunction getFunction(ObjectPath functionPath) + throws FunctionNotExistException, CatalogException { + throw new FunctionNotExistException(getName(), functionPath); + } + + + @Override + public boolean functionExists(ObjectPath functionPath) throws CatalogException { + return false; + } + + @Override + public void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists) + throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterFunction( + ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists) + throws FunctionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) + throws FunctionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + // ------ statistics ------ + + @Override + public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + return CatalogTableStatistics.UNKNOWN; + } + + @Override + public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + return CatalogColumnStatistics.UNKNOWN; + } + + @Override + public CatalogTableStatistics getPartitionStatistics( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws PartitionNotExistException, CatalogException { + return CatalogTableStatistics.UNKNOWN; + } + + @Override + public CatalogColumnStatistics getPartitionColumnStatistics( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws PartitionNotExistException, CatalogException { + return CatalogColumnStatistics.UNKNOWN; + } + + @Override + public void alterTableStatistics( + ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterTableColumnStatistics( + ObjectPath tablePath, + CatalogColumnStatistics columnStatistics, + boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException, TablePartitionedException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterPartitionStatistics( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogTableStatistics partitionStatistics, + boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterPartitionColumnStatistics( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogColumnStatistics columnStatistics, + boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @VisibleForTesting + String getJdbcUrl() { + return jdbcUrl; + } + + @VisibleForTesting + String getHttpUrl() { + return httpUrl; + } + + @VisibleForTesting + String getUsername() { + return username; + } + + @VisibleForTesting + String getPassword() { + return password; + } + + @VisibleForTesting + Configuration getSourceBaseConfig() { + return sourceBaseConfig; + } + + @VisibleForTesting + Configuration getSinkBaseConfig() { + return sinkBaseConfig; + } + + @VisibleForTesting + Configuration getTableBaseConfig() { + return tableBaseConfig; + } +} diff --git a/src/main/java/com/starrocks/connector/flink/catalog/StarRocksCatalogFactory.java b/src/main/java/com/starrocks/connector/flink/catalog/StarRocksCatalogFactory.java new file mode 100644 index 00000000..cba43d28 --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/catalog/StarRocksCatalogFactory.java @@ -0,0 +1,95 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.catalog; + +import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions; +import com.starrocks.connector.flink.table.source.StarRocksSourceOptions; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.factories.CatalogFactory; +import org.apache.flink.table.factories.FactoryUtil; + +import java.util.HashSet; +import java.util.Set; + +import static com.starrocks.connector.flink.catalog.CatalogOptions.IDENTIFIER; +import static com.starrocks.connector.flink.catalog.ConfigUtils.getPrefixConfigs; + +public class StarRocksCatalogFactory implements CatalogFactory { + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Catalog createCatalog(Context context) { + final FactoryUtil.CatalogFactoryHelper helper = + FactoryUtil.createCatalogFactoryHelper(this, context); + helper.validateExcept( + StarRocksSourceOptions.SOURCE_PROPERTIES_PREFIX, + StarRocksSinkOptions.SINK_PROPERTIES_PREFIX, + CatalogOptions.TABLE_PROPERTIES_PREFIX); + + Configuration sourceBaseConfig = + Configuration.fromMap( + getPrefixConfigs("scan.", context.getOptions(), false)); + Configuration sinkBaseConfig = + Configuration.fromMap( + getPrefixConfigs("sink.", context.getOptions(), false)); + Configuration tableBaseConfig = + Configuration.fromMap( + getPrefixConfigs("table.", context.getOptions(), false)); + return new StarRocksCatalog( + context.getName(), + helper.getOptions().get(CatalogOptions.FE_JDBC_URL), + helper.getOptions().get(CatalogOptions.FE_HTTP_URL), + helper.getOptions().get(CatalogOptions.USERNAME), + helper.getOptions().get(CatalogOptions.PASSWORD), + helper.getOptions().get(CatalogOptions.DEFAULT_DATABASE), + sourceBaseConfig, + sinkBaseConfig, + tableBaseConfig, + context.getClassLoader() + ); + } + + @Override + public Set> requiredOptions() { + final Set> options = new HashSet<>(); + options.add(CatalogOptions.FE_JDBC_URL); + options.add(CatalogOptions.FE_HTTP_URL); + options.add(CatalogOptions.USERNAME); + options.add(CatalogOptions.PASSWORD); + options.add(CatalogOptions.DEFAULT_DATABASE); + return options; + } + + @Override + public Set> optionalOptions() { + final Set> options = new HashSet<>(); + StarRocksDynamicTableFactory factory = new StarRocksDynamicTableFactory(); + options.addAll(factory.optionalOptions()); + options.add(CatalogOptions.TABLE_NUM_BUCKETS); + return options; + } +} diff --git a/src/main/java/com/starrocks/connector/flink/catalog/StarRocksColumn.java b/src/main/java/com/starrocks/connector/flink/catalog/StarRocksColumn.java new file mode 100644 index 00000000..bebb710b --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/catalog/StarRocksColumn.java @@ -0,0 +1,164 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.catalog; + +import java.io.Serializable; +import javax.annotation.Nullable; + +/** Describe a column of StarRocks table. */ +public class StarRocksColumn implements Serializable { + + private static final long serialVersionUID = 1L; + + private final String name; + // Index of column in table (starting at 0) + private final int ordinalPosition; + private final String type; + @Nullable + private final String key; + @Nullable + private final Integer size; + @Nullable + private final Integer scale; + @Nullable + private final String defaultValue; + private final boolean isNullable; + @Nullable + private final String comment; + + private StarRocksColumn(String name, int ordinalPosition, String type, @Nullable String key, + @Nullable Integer size, @Nullable Integer scale, @Nullable String defaultValue, + boolean isNullable, @Nullable String comment) { + this.name = name; + this.ordinalPosition = ordinalPosition; + this.type = type; + this.key = key; + this.size = size; + this.scale = scale; + this.defaultValue = defaultValue; + this.isNullable = isNullable; + this.comment = comment; + } + + public String getName() { + return name; + } + + public int getOrdinalPosition() { + return ordinalPosition; + } + + public String getType() { + return type; + } + + @Nullable + public String getKey() { + return key; + } + + @Nullable + public Integer getSize() { + return size; + } + + @Nullable + public Integer getScale() { + return scale; + } + + @Nullable + public String getDefaultValue() { + return defaultValue; + } + + public boolean isNullable() { + return isNullable; + } + + @Nullable + public String getComment() { + return comment; + } + + public static class Builder { + + private String name; + private int ordinalPosition; + private String type; + private String key; + private Integer size; + private Integer scale; + private String defaultValue; + boolean isNullable; + private String comment; + + public Builder setName(String name) { + this.name = name; + return this; + } + + public Builder setOrdinalPosition(int ordinalPosition) { + this.ordinalPosition = ordinalPosition; + return this; + } + + public Builder setType(String type) { + this.type = type; + return this; + } + + public Builder setKey(String key) { + this.key = key; + return this; + } + + public Builder setSize(Integer size) { + this.size = size; + return this; + } + + public Builder setScale(Integer scale) { + this.scale = scale; + return this; + } + + public Builder setDefaultValue(String defaultValue) { + this.defaultValue = defaultValue; + return this; + } + + public Builder setNullable(boolean nullable) { + isNullable = nullable; + return this; + } + + public Builder setComment(String comment) { + this.comment = comment; + return this; + } + + public StarRocksColumn build() { + return new StarRocksColumn(name, ordinalPosition, type, key, size, scale, + defaultValue, isNullable, comment); + } + } +} \ No newline at end of file diff --git a/src/main/java/com/starrocks/connector/flink/catalog/StarRocksDynamicTableFactory.java b/src/main/java/com/starrocks/connector/flink/catalog/StarRocksDynamicTableFactory.java new file mode 100644 index 00000000..97679d43 --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/catalog/StarRocksDynamicTableFactory.java @@ -0,0 +1,79 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.catalog; + +import com.starrocks.connector.flink.table.sink.StarRocksDynamicTableSinkFactory; +import com.starrocks.connector.flink.table.source.StarRocksDynamicTableSourceFactory; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.DynamicTableSourceFactory; + +import java.util.HashSet; +import java.util.Set; + +/** + * Note this factory is only used for catalog currently, and can unify it with + * {@link StarRocksDynamicTableSourceFactory} and {@link StarRocksDynamicTableSinkFactory} + * in the future. + */ +public class StarRocksDynamicTableFactory implements DynamicTableSinkFactory, DynamicTableSourceFactory { + + private final StarRocksDynamicTableSourceFactory sourceFactory; + private final StarRocksDynamicTableSinkFactory sinkFactory; + + public StarRocksDynamicTableFactory() { + this.sourceFactory = new StarRocksDynamicTableSourceFactory(); + this.sinkFactory = new StarRocksDynamicTableSinkFactory(); + } + + @Override + public String factoryIdentifier() { + return CatalogOptions.IDENTIFIER; + } + + @Override + public DynamicTableSource createDynamicTableSource(Context context) { + return sourceFactory.createDynamicTableSource(context, false); + } + + @Override + public DynamicTableSink createDynamicTableSink(Context context) { + return sinkFactory.createDynamicTableSink(context, false); + } + + @Override + public Set> requiredOptions() { + Set> configOptions = new HashSet<>(); + configOptions.addAll(sourceFactory.requiredOptions()); + configOptions.addAll(sinkFactory.requiredOptions()); + return configOptions; + } + + @Override + public Set> optionalOptions() { + Set> configOptions = new HashSet<>(); + configOptions.addAll(sourceFactory.optionalOptions()); + configOptions.addAll(sinkFactory.optionalOptions()); + return configOptions; + } +} diff --git a/src/main/java/com/starrocks/connector/flink/catalog/StarRocksSchema.java b/src/main/java/com/starrocks/connector/flink/catalog/StarRocksSchema.java new file mode 100644 index 00000000..cc94821c --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/catalog/StarRocksSchema.java @@ -0,0 +1,71 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.catalog; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** Describe the schema of a StarRocks table. */ +public class StarRocksSchema implements Serializable { + + private static final long serialVersionUID = 1L; + + // Have been sorted by the ordinal positions of columns. + private final List columns; + // Map column names to the Column structure. + private final Map columnMap; + + private StarRocksSchema(List columns) { + this.columns = new ArrayList<>(columns); + this.columns.sort(Comparator.comparingInt(StarRocksColumn::getOrdinalPosition)); + this.columnMap = new HashMap<>(); + this.columns.forEach(column -> columnMap.put(column.getName(), column)); + } + + public StarRocksColumn getColumn(String columnName) { + return columnMap.get(columnName); + } + + public List getColumns() { + return columns; + } + + public static class Builder { + + private final List columns = new ArrayList<>(); + + public Builder addColumn(StarRocksColumn column) { + columns.add(column); + return this; + } + + public StarRocksSchema build() { + Preconditions.checkState(!columns.isEmpty(), "There should be at least one column."); + return new StarRocksSchema(columns); + } + } +} diff --git a/src/main/java/com/starrocks/connector/flink/catalog/StarRocksTable.java b/src/main/java/com/starrocks/connector/flink/catalog/StarRocksTable.java new file mode 100644 index 00000000..83c523de --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/catalog/StarRocksTable.java @@ -0,0 +1,211 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.catalog; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import javax.annotation.Nullable; + +/** Describe a StarRocks table. */ +public class StarRocksTable implements Serializable { + + enum TableType { + UNKNOWN, + DUPLICATE, + AGGREGATE, + UNIQUE, + PRIMARY + } + + private static final long serialVersionUID = 1L; + + private final String database; + + private final String table; + + private final StarRocksSchema schema; + + private final TableType tableType; + + // Following fields can be nullable if we can not get the metas from StarRocks + + // For different tables, have different meaning. It's duplicate keys for duplicate table, + // aggregate keys for aggregate table, unique keys for unique table, and primary keys for + // primary table. The list has been sorted by the ordinal positions of the columns. + @Nullable + private final List tableKeys; + + @Nullable + private final List partitionKeys; + + @Nullable + private final Integer numBuckets; + + @Nullable + public final String comment; + + @Nullable + private final Map properties; + + private StarRocksTable(String database, String table, StarRocksSchema schema, TableType tableType, + @Nullable List tableKeys, @Nullable List partitionKeys, + @Nullable Integer numBuckets, @Nullable String comment, + @Nullable Map properties) { + this.database = Preconditions.checkNotNull(database); + this.table = Preconditions.checkNotNull(table); + this.schema = Preconditions.checkNotNull(schema); + this.tableType = Preconditions.checkNotNull(tableType); + this.tableKeys = tableKeys; + this.partitionKeys = partitionKeys; + this.numBuckets = numBuckets; + this.comment = comment; + this.properties = properties; + } + + public String getDatabase() { + return database; + } + + public String getTable() { + return table; + } + + public StarRocksSchema getSchema() { + return schema; + } + + @Nullable + public TableType getTableType() { + return tableType; + } + + @Nullable + public List getTableKeys() { + return tableKeys; + } + + @Nullable + public List getPartitionKeys() { + return partitionKeys; + } + + @Nullable + public Integer getNumBuckets() { + return numBuckets; + } + + @Nullable + public String getComment() { + return comment; + } + + @Nullable + public Map getProperties() { + return properties; + } + + public static class Builder { + + private String database; + private String table; + private StarRocksSchema schema; + private TableType tableType; + private List tableKeys; + private List partitionKeys; + private Integer numBuckets; + private String comment; + private Map properties; + + public Builder setDatabase(String database) { + this.database = database; + return this; + } + + public Builder setTable(String table) { + this.table = table; + return this; + } + + public Builder setSchema(StarRocksSchema schema) { + this.schema = schema; + return this; + } + + public Builder setTableType(TableType tableType) { + this.tableType = tableType; + return this; + } + + public Builder setTableKeys(List tableKeys) { + this.tableKeys = new ArrayList<>(tableKeys); + return this; + } + + public Builder setPartitionKeys(List partitionKeys) { + this.partitionKeys = new ArrayList<>(partitionKeys); + return this; + } + + public Builder setNumBuckets(int numBuckets) { + this.numBuckets = numBuckets; + return this; + } + + public Builder setComment(String comment) { + this.comment = comment; + return this; + } + + public Builder setTableProperties(Map properties) { + this.properties = new HashMap<>(properties); + return this; + } + + public StarRocksTable build() { + Preconditions.checkNotNull(database, "database can't be null"); + Preconditions.checkNotNull(table, "table can't be null"); + Preconditions.checkNotNull(schema, "schema can't be null"); + Preconditions.checkNotNull(tableType, "table type can't be null"); + + if (tableKeys != null) { + List tableKeyColumns = new ArrayList<>(tableKeys.size()); + for (String name : tableKeys) { + StarRocksColumn column = schema.getColumn(name); + Preconditions.checkNotNull(column, + String.format("%s.%s does not contain column %s", database, table, name)); + tableKeyColumns.add(column); + } + tableKeyColumns.sort(Comparator.comparingInt(StarRocksColumn::getOrdinalPosition)); + tableKeys = tableKeyColumns.stream().map(StarRocksColumn::getName).collect(Collectors.toList()); + } + + return new StarRocksTable(database, table, schema, tableType, tableKeys, partitionKeys, + numBuckets, comment, properties); + } + } +} diff --git a/src/main/java/com/starrocks/connector/flink/catalog/StarRocksUtils.java b/src/main/java/com/starrocks/connector/flink/catalog/StarRocksUtils.java new file mode 100644 index 00000000..6075f0e5 --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/catalog/StarRocksUtils.java @@ -0,0 +1,228 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.catalog; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Preconditions; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; +import javax.annotation.Nullable; + +public class StarRocksUtils { + + private static final String TABLE_SCHEMA_QUERY = + "SELECT `COLUMN_NAME`, `DATA_TYPE`, `ORDINAL_POSITION`, `COLUMN_SIZE`, `DECIMAL_DIGITS`, " + + "`COLUMN_DEFAULT`, `IS_NULLABLE`, `COLUMN_KEY` FROM `information_schema`.`COLUMNS` " + + "WHERE `TABLE_SCHEMA`=? AND `TABLE_NAME`=?;"; + + public static StarRocksSchema getStarRocksSchema(Connection connection, String database, String table) throws Exception { + StarRocksSchema.Builder schemaBuilder = new StarRocksSchema.Builder(); + try (PreparedStatement statement = connection.prepareStatement(TABLE_SCHEMA_QUERY)) { + statement.setObject(1, database); + statement.setObject(2, table); + try (ResultSet resultSet = statement.executeQuery()) { + while (resultSet.next()) { + String name = resultSet.getString("COLUMN_NAME"); + String type = resultSet.getString("DATA_TYPE"); + int position = resultSet.getInt("ORDINAL_POSITION"); + Integer size = resultSet.getInt("COLUMN_SIZE"); + if (resultSet.wasNull()) { + size = null; + } + Integer scale = resultSet.getInt("DECIMAL_DIGITS"); + if (resultSet.wasNull()) { + scale = null; + } + String defaultValue = resultSet.getString("COLUMN_DEFAULT"); + String isNullable = resultSet.getString("IS_NULLABLE"); + String columnKey = resultSet.getString("COLUMN_KEY"); + StarRocksColumn column = new StarRocksColumn.Builder() + .setName(name) + .setOrdinalPosition(position - 1) + .setType(type) + .setKey(columnKey) + .setSize(size) + .setScale(scale) + .setDefaultValue(defaultValue) + .setNullable(isNullable == null || !isNullable.equalsIgnoreCase("NO")) + .setComment(null) + .build(); + schemaBuilder.addColumn(column); + } + } + } + + return schemaBuilder.build(); + } + + public static List getPrimaryKeys(StarRocksSchema starRocksSchema) { + return starRocksSchema.getColumns().stream() + .filter(column -> "PRI".equalsIgnoreCase(column.getKey())) + // In StarRocks, primary keys must be declared in the same order of that in schema + .sorted(Comparator.comparingInt(StarRocksColumn::getOrdinalPosition)) + .map(StarRocksColumn::getName) + .collect(Collectors.toList()); + } + + public static StarRocksTable getStarRocksTable(Connection connection, String database, String table) throws Exception { + StarRocksSchema starRocksSchema = StarRocksUtils.getStarRocksSchema(connection, database, table); + // could be empty if this is not a primary key table + List primaryKeys = StarRocksUtils.getPrimaryKeys(starRocksSchema); + StarRocksTable.TableType tableType = primaryKeys.isEmpty() ? + StarRocksTable.TableType.UNKNOWN : StarRocksTable.TableType.PRIMARY; + return new StarRocksTable.Builder() + .setDatabase(database) + .setTable(table) + .setSchema(starRocksSchema) + .setTableType(tableType) + .setTableKeys(primaryKeys.isEmpty() ? null : primaryKeys) + .build(); + } + + public static StarRocksTable toStarRocksTable( + String catalogName, + ObjectPath tablePath, + Configuration tableBaseConfig, + CatalogBaseTable flinkTable) { + TableSchema flinkSchema = flinkTable.getSchema(); + if (!flinkSchema.getPrimaryKey().isPresent()) { + throw new CatalogException( + String.format("Catalog %s can't create a non primary key table %s.", + catalogName, tablePath.getFullName())); + } + RowType rowType = (RowType) flinkSchema.toPhysicalRowDataType().getLogicalType(); + StarRocksSchema.Builder starRocksSchemaBuilder = new StarRocksSchema.Builder(); + for (int i = 0; i < rowType.getFieldCount(); i++) { + RowType.RowField field = rowType.getFields().get(i); + StarRocksColumn.Builder columnBuilder = + new StarRocksColumn.Builder() + .setName(field.getName()) + .setOrdinalPosition(i); + TypeUtils.toStarRocksType(columnBuilder, field.getType()); + starRocksSchemaBuilder.addColumn(columnBuilder.build()); + } + StarRocksSchema starRocksSchema = starRocksSchemaBuilder.build(); + StarRocksTable.Builder starRocksTableBuilder = new StarRocksTable.Builder() + .setDatabase(tablePath.getDatabaseName()) + .setTable(tablePath.getObjectName()) + .setSchema(starRocksSchema); + + List primaryKeys = flinkSchema.getPrimaryKey() + .map(pk -> pk.getColumns()) + .orElse(Collections.emptyList()); + // have verified it's a primary key table above + Preconditions.checkState(!primaryKeys.isEmpty()); + starRocksTableBuilder.setTableType(StarRocksTable.TableType.PRIMARY); + starRocksTableBuilder.setTableKeys(primaryKeys); + if (tableBaseConfig.contains(CatalogOptions.TABLE_NUM_BUCKETS)) { + starRocksTableBuilder.setNumBuckets(tableBaseConfig.get(CatalogOptions.TABLE_NUM_BUCKETS)); + } + starRocksTableBuilder.setComment(flinkTable.getComment()); + starRocksTableBuilder.setTableProperties( + ConfigUtils.getPrefixConfigs( + CatalogOptions.TABLE_PROPERTIES_PREFIX, + tableBaseConfig.toMap(), + true + ) + ); + return starRocksTableBuilder.build(); + } + + public static String buildCreateTableSql(StarRocksTable table, boolean ignoreIfExists) { + Preconditions.checkState(table.getTableType() == StarRocksTable.TableType.PRIMARY); + StringBuilder builder = new StringBuilder(); + builder.append( + String.format("CREATE TABLE %s`%s`.`%s`", + ignoreIfExists ? "IF NOT EXISTS " : "", + table.getDatabase(), + table.getTable()) + ); + builder.append(" (\n"); + StarRocksSchema schema = table.getSchema(); + String columnsStmt = schema.getColumns().stream().map(StarRocksUtils::buildColumnStatement) + .collect(Collectors.joining(",\n")); + builder.append(columnsStmt); + builder.append("\n) "); + String primaryKeys = table.getTableKeys().stream() + .map(pk -> "`" + pk + "`").collect(Collectors.joining(", ")); + builder.append(String.format("PRIMARY KEY (%s)\n", primaryKeys)); + builder.append(String.format("DISTRIBUTED BY HASH (%s)", primaryKeys)); + if (table.getNumBuckets() != null) { + builder.append(" BUCKETS "); + builder.append(table.getNumBuckets()); + } + if (table.getProperties() != null && !table.getProperties().isEmpty()) { + builder.append("\nPROPERTIES (\n"); + String properties = table.getProperties().entrySet().stream() + .map(entry -> String.format("\"%s\" = \"%s\"", entry.getKey(), entry.getValue())) + .collect(Collectors.joining(",\n")); + builder.append(properties); + builder.append("\n)"); + } + builder.append(";"); + return builder.toString(); + } + + public static String buildColumnStatement(StarRocksColumn column) { + StringBuilder builder = new StringBuilder(); + builder.append("`"); + builder.append(column.getName()); + builder.append("` "); + builder.append(getFullColumnType(column.getType(), column.getSize(), column.getScale())); + builder.append(" "); + builder.append(column.isNullable() ? "NULL" : "NOT NULL"); + if (column.getDefaultValue() != null) { + builder.append(" DEFAULT '"); + builder.append(column.getDefaultValue()); + builder.append("'"); + } + if (column.getComment() != null) { + builder.append(" COMMENT \""); + builder.append(column.getComment()); + builder.append("\""); + } + return builder.toString(); + } + + public static String getFullColumnType(String type, @Nullable Integer size, @Nullable Integer scale) { + String dataType = type.toUpperCase(); + switch (dataType) { + case "DECIMAL": + return String.format("DECIMAL(%d, %s)", size, scale); + case "CHAR": + case "VARCHAR": + return String.format("%s(%d)", dataType, size); + default: + return dataType; + } + } +} diff --git a/src/main/java/com/starrocks/connector/flink/catalog/TypeUtils.java b/src/main/java/com/starrocks/connector/flink/catalog/TypeUtils.java new file mode 100644 index 00000000..a471886a --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/catalog/TypeUtils.java @@ -0,0 +1,233 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.catalog; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.CharType; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.TinyIntType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.types.logical.ZonedTimestampType; +import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +public class TypeUtils { + + public static final String BOOLEAN = "BOOLEAN"; + public static final String TINYINT = "TINYINT"; + public static final String SMALLINT = "SMALLINT"; + public static final String INT = "INT"; + public static final String BIGINT = "BIGINT"; + public static final String LARGEINT = "BIGINT UNSIGNED"; + public static final String FLOAT = "FLOAT"; + public static final String DOUBLE = "DOUBLE"; + public static final String DECIMAL = "DECIMAL"; + public static final String CHAR = "CHAR"; + public static final String VARCHAR = "VARCHAR"; + public static final String STRING = "STRING"; + public static final String DATE = "DATE"; + public static final String DATETIME = "DATETIME"; + public static final String JSON = "JSON"; + + public static final int MAX_VARCHAR_SIZE = 1048576; + public static final int STRING_SIZE = 65533; + + public static DataType toFlinkType( + String starRocksType, + @Nullable Integer precision, + @Nullable Integer scale, + boolean isNull) { + switch (starRocksType.toUpperCase()) { + case BOOLEAN: + return wrapNull(DataTypes.BOOLEAN(), isNull); + case TINYINT: + // mysql does not have boolean type, and starrocks `information_schema`.`COLUMNS` will return + // a "tinyint" data type for both StarRocks BOOLEAN and TINYINT type, We distinguish them by + // column size, and the size of BOOLEAN is null + return precision == null + ? wrapNull(DataTypes.BOOLEAN(), isNull) + : wrapNull(DataTypes.TINYINT(), isNull); + case SMALLINT: + return wrapNull(DataTypes.SMALLINT(), isNull); + case INT: + return wrapNull(DataTypes.INT(), isNull); + case BIGINT: + return wrapNull(DataTypes.BIGINT(), isNull); + case LARGEINT: + return wrapNull(DataTypes.STRING(), isNull); + case FLOAT: + return wrapNull(DataTypes.FLOAT(), isNull); + case DOUBLE: + return wrapNull(DataTypes.DOUBLE(), isNull); + case DECIMAL: + Preconditions.checkNotNull(precision, "Precision for StarRocks DECIMAL can't be null."); + Preconditions.checkNotNull(scale, "Scale for StarRocks DECIMAL can't be null."); + return wrapNull(DataTypes.DECIMAL(precision, scale), isNull); + case CHAR: + Preconditions.checkNotNull(precision, "Precision for StarRocks CHAR can't be null."); + return wrapNull(DataTypes.CHAR(precision), isNull); + case VARCHAR: + Preconditions.checkNotNull(precision, "Precision for StarRocks VARCHAR can't be null."); + return wrapNull(DataTypes.VARCHAR(precision), isNull); + case STRING: + return wrapNull(DataTypes.STRING(), isNull); + case DATE: + return wrapNull(DataTypes.DATE(), isNull); + case DATETIME: + return wrapNull(DataTypes.TIMESTAMP(0), isNull); + case JSON: + return wrapNull(DataTypes.STRING(), isNull); + default: + throw new UnsupportedOperationException( + String.format( + "Unsupported StarRocks type %s when mapping StarRocks and Flink tables via Catalog. " + + "You can try to create table directly if you want to map this StarRocks type.", + starRocksType)); + } + } + + private static DataType wrapNull(DataType dataType, boolean isNull) { + return isNull ? dataType.nullable() : dataType.notNull(); + } + + public static void toStarRocksType(StarRocksColumn.Builder builder, LogicalType flinkType) { + flinkType.accept(new FlinkLogicalTypeVisitor(builder)); + } + + private static class FlinkLogicalTypeVisitor extends LogicalTypeDefaultVisitor { + + private final StarRocksColumn.Builder builder; + + public FlinkLogicalTypeVisitor(StarRocksColumn.Builder builder) { + this.builder = builder; + } + + @Override + public StarRocksColumn.Builder visit(CharType charType) { + builder.setType(CHAR); + builder.setSize(charType.getLength()); + return builder; + } + + @Override + public StarRocksColumn.Builder visit(VarCharType varCharType) { + builder.setType(VARCHAR); + builder.setSize(Math.min(varCharType.getLength(), MAX_VARCHAR_SIZE)); + return builder; + } + + @Override + public StarRocksColumn.Builder visit(BooleanType booleanType) { + builder.setType(BOOLEAN); + return builder; + } + + @Override + public StarRocksColumn.Builder visit(DecimalType decimalType) { + builder.setType(DECIMAL); + builder.setSize(decimalType.getPrecision()); + builder.setScale(decimalType.getScale()); + return builder; + } + + @Override + public StarRocksColumn.Builder visit(TinyIntType tinyIntType) { + builder.setType(TINYINT); + return builder; + } + + @Override + public StarRocksColumn.Builder visit(SmallIntType smallIntType) { + builder.setType(SMALLINT); + return builder; + } + + @Override + public StarRocksColumn.Builder visit(IntType intType) { + builder.setType(INT); + return builder; + } + + @Override + public StarRocksColumn.Builder visit(BigIntType bigIntType) { + builder.setType(BIGINT); + return builder; + } + + @Override + public StarRocksColumn.Builder visit(FloatType floatType) { + builder.setType(FLOAT); + return builder; + } + + @Override + public StarRocksColumn.Builder visit(DoubleType doubleType) { + builder.setType(DOUBLE); + return builder; + } + + @Override + public StarRocksColumn.Builder visit(DateType dateType) { + builder.setType(DATE); + return builder; + } + + @Override + public StarRocksColumn.Builder visit(TimestampType timestampType) { + builder.setType(DATETIME); + return builder; + } + + @Override + public StarRocksColumn.Builder visit(ZonedTimestampType zonedTimestampType) { + builder.setType(DATETIME); + return builder; + } + + @Override + public StarRocksColumn.Builder visit(LocalZonedTimestampType localZonedTimestampType) { + builder.setType(DATETIME); + return builder; + } + + @Override + protected StarRocksColumn.Builder defaultMethod(LogicalType logicalType) { + throw new UnsupportedOperationException( + String.format( + "Unsupported StarRocks type %s when mapping StarRocks and Flink tables via Catalog. " + + "You can try to create table directly if you want to map this Flink type.", + logicalType.toString())); + } + } +} diff --git a/src/main/java/com/starrocks/connector/flink/manager/StarRocksQueryPlanVisitor.java b/src/main/java/com/starrocks/connector/flink/manager/StarRocksQueryPlanVisitor.java index f119f99b..6006e012 100644 --- a/src/main/java/com/starrocks/connector/flink/manager/StarRocksQueryPlanVisitor.java +++ b/src/main/java/com/starrocks/connector/flink/manager/StarRocksQueryPlanVisitor.java @@ -19,7 +19,7 @@ import com.starrocks.connector.flink.table.source.struct.QueryBeXTablets; import com.starrocks.connector.flink.table.source.struct.QueryInfo; import com.starrocks.connector.flink.table.source.struct.QueryPlan; - +import com.starrocks.connector.flink.tools.JsonWrapper; import org.apache.commons.codec.binary.Base64; import org.apache.http.HttpEntity; import org.apache.http.client.methods.CloseableHttpResponse; @@ -137,8 +137,7 @@ private static QueryPlan getQueryPlan(String querySQL, String httpNode, StarRock LOG.warn("Request failed with empty response."); throw new RuntimeException("Request failed with empty response." + requsetCode); } - JSONObject jsonObject = JSONObject.parseObject(respString); - return JSONObject.toJavaObject(jsonObject, QueryPlan.class); + return new JsonWrapper().parseObject(respString, QueryPlan.class); } private static String getBasicAuthHeader(String username, String password) { diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSinkFactory.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSinkFactory.java index af4b61b2..39f858d1 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSinkFactory.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSinkFactory.java @@ -29,8 +29,14 @@ public class StarRocksDynamicTableSinkFactory implements DynamicTableSinkFactory @Override public DynamicTableSink createDynamicTableSink(Context context) { + return createDynamicTableSink(context, true); + } + + public DynamicTableSink createDynamicTableSink(Context context, boolean needValidate) { final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); - helper.validateExcept(StarRocksSinkOptions.SINK_PROPERTIES_PREFIX); + if (needValidate) { + helper.validateExcept(StarRocksSinkOptions.SINK_PROPERTIES_PREFIX); + } ReadableConfig options = helper.getOptions(); // validate some special properties StarRocksSinkOptions sinkOptions = new StarRocksSinkOptions(options, context.getCatalogTable().getOptions()); diff --git a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSourceFactory.java b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSourceFactory.java index 660c1d41..e340b2cd 100644 --- a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSourceFactory.java +++ b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksDynamicTableSourceFactory.java @@ -14,6 +14,7 @@ package com.starrocks.connector.flink.table.source; +import com.starrocks.connector.flink.table.source.struct.PushDownHolder; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.api.TableSchema; @@ -25,15 +26,19 @@ import java.util.HashSet; import java.util.Set; -import com.starrocks.connector.flink.table.source.struct.PushDownHolder; - public final class StarRocksDynamicTableSourceFactory implements DynamicTableSourceFactory { @Override public DynamicTableSource createDynamicTableSource(Context context) { + return createDynamicTableSource(context, true); + } + + public DynamicTableSource createDynamicTableSource(Context context, boolean needValidate) { final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); - helper.validateExcept(StarRocksSourceOptions.SOURCE_PROPERTIES_PREFIX); + if (needValidate) { + helper.validateExcept(StarRocksSourceOptions.SOURCE_PROPERTIES_PREFIX); + } ReadableConfig options = helper.getOptions(); // validate some special properties StarRocksSourceOptions sourceOptions = new StarRocksSourceOptions(options, context.getCatalogTable().getOptions()); @@ -42,7 +47,6 @@ public DynamicTableSource createDynamicTableSource(Context context) { return new StarRocksDynamicTableSource(sourceOptions, flinkSchema, pushDownHolder); } - @Override public String factoryIdentifier() { return "starrocks"; diff --git a/src/main/java/com/starrocks/connector/flink/tools/JsonWrapper.java b/src/main/java/com/starrocks/connector/flink/tools/JsonWrapper.java index ca922bd9..9c4b7c78 100644 --- a/src/main/java/com/starrocks/connector/flink/tools/JsonWrapper.java +++ b/src/main/java/com/starrocks/connector/flink/tools/JsonWrapper.java @@ -71,6 +71,10 @@ public Object parse(String text) { public T parseObject(byte[] bytes, Type clazz) { return JSON.parseObject(bytes, 0, bytes.length, IOUtils.UTF8, clazz, parserConfig, null, DEFAULT_PARSER_FEATURE); } + + public T parseObject(String text, Type clazz) { + return JSON.parseObject(text, clazz, parserConfig, null, DEFAULT_PARSER_FEATURE); + } } final class BinaryStringDataSerializer implements ObjectSerializer, Serializable { diff --git a/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index 99262195..cabc60f4 100644 --- a/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -11,4 +11,5 @@ # limitations under the License. com.starrocks.connector.flink.table.sink.StarRocksDynamicTableSinkFactory -com.starrocks.connector.flink.table.source.StarRocksDynamicTableSourceFactory \ No newline at end of file +com.starrocks.connector.flink.table.source.StarRocksDynamicTableSourceFactory +com.starrocks.connector.flink.catalog.StarRocksCatalogFactory \ No newline at end of file diff --git a/src/test/java/com/starrocks/connector/flink/catalog/StarRocksCatalogFactoryTest.java b/src/test/java/com/starrocks/connector/flink/catalog/StarRocksCatalogFactoryTest.java new file mode 100644 index 00000000..a3a3c7e8 --- /dev/null +++ b/src/test/java/com/starrocks/connector/flink/catalog/StarRocksCatalogFactoryTest.java @@ -0,0 +1,81 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.catalog; + +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.factories.FactoryUtil; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class StarRocksCatalogFactoryTest { + + @Test + public void testCreateCatalog() { + final Map options = new HashMap<>(); + options.put("type", "starrocks"); + options.put("jdbc-url", "jdbc:mysql://127.0.0.1:11903"); + options.put("http-url", "127.0.0.1:11901"); + options.put("default-database", "starrocks_db"); + options.put("username", "root"); + options.put("password", "123456"); + options.put("scan.connect.timeout-ms", "2000"); + options.put("scan.params.batch-rows", "8192"); + options.put("sink.semantic", "exactly-once"); + options.put("sink.properties.format", "json"); + options.put("table.num-buckets", "10"); + options.put("table.properties.replication_num", "1"); + + Catalog catalog = FactoryUtil.createCatalog( + "test_catalog", + options, + null, + Thread.currentThread().getContextClassLoader() + ); + assertTrue(catalog instanceof StarRocksCatalog); + StarRocksCatalog starRocksCatalog = (StarRocksCatalog) catalog; + assertEquals("test_catalog", starRocksCatalog.getName()); + assertEquals("starrocks_db", starRocksCatalog.getDefaultDatabase()); + assertEquals("jdbc:mysql://127.0.0.1:11903", starRocksCatalog.getJdbcUrl()); + assertEquals("127.0.0.1:11901", starRocksCatalog.getHttpUrl()); + assertEquals("root", starRocksCatalog.getUsername()); + assertEquals("123456", starRocksCatalog.getPassword()); + + Map sourceBaseConfig = starRocksCatalog.getSourceBaseConfig().toMap(); + assertEquals(2, sourceBaseConfig.size()); + assertEquals("2000", sourceBaseConfig.get("scan.connect.timeout-ms")); + assertEquals("8192", sourceBaseConfig.get("scan.params.batch-rows")); + + Map sinkBaseConfig = starRocksCatalog.getSinkBaseConfig().toMap(); + assertEquals(2, sinkBaseConfig.size()); + assertEquals("exactly-once", sinkBaseConfig.get("sink.semantic")); + assertEquals("json", sinkBaseConfig.get("sink.properties.format")); + + Map tableBaseConfig = starRocksCatalog.getTableBaseConfig().toMap(); + assertEquals(2, tableBaseConfig.size()); + assertEquals("10", tableBaseConfig.get("table.num-buckets")); + assertEquals("1", tableBaseConfig.get("table.properties.replication_num")); + } +} diff --git a/src/test/java/com/starrocks/connector/flink/it/StarRocksITTestBase.java b/src/test/java/com/starrocks/connector/flink/it/StarRocksITTestBase.java index c935d13c..f80a9059 100644 --- a/src/test/java/com/starrocks/connector/flink/it/StarRocksITTestBase.java +++ b/src/test/java/com/starrocks/connector/flink/it/StarRocksITTestBase.java @@ -1,4 +1,6 @@ /* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,13 +20,10 @@ package com.starrocks.connector.flink.it; -import com.esotericsoftware.minlog.Log; -import com.starrocks.connector.flink.container.StarRocksCluster; -import org.apache.flink.types.Row; -import org.apache.flink.util.TestLogger; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,37 +31,62 @@ import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.sql.Statement; import java.util.ArrayList; -import java.util.Arrays; -import java.util.function.Function; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; -import static org.junit.Assert.assertEquals; +import static org.junit.Assume.assumeTrue; -/** Abstract IT case class for StarRocks. */ -public abstract class StarRocksITTestBase extends TestLogger { +public abstract class StarRocksITTestBase { private static final Logger LOG = LoggerFactory.getLogger(StarRocksITTestBase.class); - public static StarRocksCluster STARROCKS_CLUSTER = - new StarRocksCluster(1, 0, 3); + private static final boolean DEBUG_MODE = false; + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + protected static String DB_NAME; + protected static String HTTP_URLS; + protected static String JDBC_URLS; + + protected static String getHttpUrls() { + return HTTP_URLS; + } + + protected static String getJdbcUrl() { + return JDBC_URLS; + } protected static Connection DB_CONNECTION; + protected static Set DATABASE_SET_TO_CLEAN; + @BeforeClass public static void setUp() throws Exception { + HTTP_URLS = DEBUG_MODE ? "127.0.0.1:11901" : System.getProperty("http_urls"); + JDBC_URLS = DEBUG_MODE ? "jdbc:mysql://127.0.0.1:11903" : System.getProperty("jdbc_urls"); + assumeTrue(HTTP_URLS != null && JDBC_URLS != null); + + DB_NAME = "sr_test_" + genRandomUuid(); try { - STARROCKS_CLUSTER.start(); - LOG.info("StarRocks cluster is started."); + DB_CONNECTION = DriverManager.getConnection(getJdbcUrl(), "root", ""); + LOG.info("Success to create db connection via jdbc {}", getJdbcUrl()); } catch (Exception e) { - LOG.error("Failed to star StarRocks cluster.", e); + LOG.error("Failed to create db connection via jdbc {}", getJdbcUrl(), e); throw e; } - Log.info("StarRocks cluster try to connect."); + + DATABASE_SET_TO_CLEAN = new HashSet<>(); try { - DB_CONNECTION = DriverManager.getConnection(getJdbcUrl(), "root", ""); - LOG.info("Success to create db connection via jdbc {}", getJdbcUrl()); + DATABASE_SET_TO_CLEAN.add(DB_NAME); + createDatabase(DB_NAME); + LOG.info("Successful to create database {}", DB_NAME); } catch (Exception e) { - LOG.error("Failed to create db connection via jdbc {}", getJdbcUrl(), e); + LOG.error("Failed to create database {}", DB_NAME, e); throw e; } } @@ -70,92 +94,47 @@ public static void setUp() throws Exception { @AfterClass public static void tearDown() throws Exception { if (DB_CONNECTION != null) { + for (String database : DATABASE_SET_TO_CLEAN) { + try { + String dropDb = String.format("DROP DATABASE IF EXISTS %s FORCE", database); + executeSrSQL(dropDb); + LOG.info("Successful to drop database {}", DB_NAME); + } catch (Exception e) { + LOG.error("Failed to drop database {}", DB_NAME, e); + } + } DB_CONNECTION.close(); - LOG.info("Close db connection"); - } - - if (STARROCKS_CLUSTER != null) { - STARROCKS_CLUSTER.stop(); - STARROCKS_CLUSTER.close(); - Log.info("Stop and close StarRocks cluster."); } } - protected static String getJdbcUrl() { - return "jdbc:mysql://" + STARROCKS_CLUSTER.getQueryUrls(); + protected static String genRandomUuid() { + return UUID.randomUUID().toString().replace("-", "_"); } - /** - * Query the data in the table, and compare the result with the expected data. - * - *

For example, we can call this method as follows: - * verifyResult( - * new Row[] { - * Row.of("1", 1.4, Timestamp.valueOf("2022-09-06 20:00:00.000")), - * Row.of("2", 2.4, Timestamp.valueOf("2022-09-06 20:00:00.000")), - * }, - * "select * from db.table" - * ); - */ - public static void verifyResult(Row[] expectedData, String sql) throws Exception { - ArrayList resultData = getSQLResult(sql); - LOG.info("resultData: {}",resultData); - LOG.info("expectedData: {}",Arrays.asList(expectedData)); - assertEquals(expectedData.length, resultData.size()); - for (int i = 0; i < expectedData.length; i++) { - assertEquals(expectedData[i].toString(), resultData.get(i).toString()); + protected static void createDatabase(String database) throws Exception { + try (Statement statement = DB_CONNECTION.createStatement()) { + DATABASE_SET_TO_CLEAN.add(database); + statement.executeUpdate(String.format("CREATE DATABASE %s;", database)); } } - public static ArrayList getSQLResult(String sql) throws Exception { - if (!DB_CONNECTION.isValid(3000)) { - reestablishConnection(); - } - ArrayList resultData = new ArrayList<>(); - try (PreparedStatement statement = DB_CONNECTION.prepareStatement(sql); - ResultSet resultSet = statement.executeQuery()) { - int columnCount = resultSet.getMetaData().getColumnCount(); - while (resultSet.next()) { - Row row = new Row(columnCount); - for (int i = 0; i < columnCount; i++) { - row.setField(i, resultSet.getObject(i + 1)); + protected static List listDatabase() throws Exception { + List databaseList = new ArrayList<>(); + try (Statement statement = DB_CONNECTION.createStatement()) { + try (ResultSet resultSet = statement.executeQuery( + "SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA`;")) { + while (resultSet.next()) { + String columnValue = resultSet.getString(1); + databaseList.add(columnValue); } - resultData.add(row); } } - return resultData; - } - - private static void reestablishConnection() throws Exception{ - try { - DB_CONNECTION = DriverManager.getConnection(getJdbcUrl(), "root", ""); - LOG.info("Success to create db connection via jdbc {}", getJdbcUrl()); - } catch (Exception e) { - LOG.error("Failed to create db connection via jdbc {}", getJdbcUrl(), e); - throw e; - } + return databaseList; } - public static void verifyResult(Function verifyFunction, String sql) throws Exception { - ArrayList resultData = new ArrayList<>(); - if (!DB_CONNECTION.isValid(3000)) { - reestablishConnection(); - } - try (PreparedStatement statement = DB_CONNECTION.prepareStatement(sql); - ResultSet resultSet = statement.executeQuery()) { - int columnCount = resultSet.getMetaData().getColumnCount(); - while (resultSet.next()) { - Row row = new Row(columnCount); - for (int i = 0; i < columnCount; i++) { - row.setField(i, resultSet.getObject(i + 1)); - } - resultData.add(row); - } - } - if (!verifyFunction.apply(resultData.toArray(new Row[0]))) - { - LOG.info("verifyResult is : {}", resultData.toString()); - Assert.fail(); + protected static void executeSrSQL(String sql) throws Exception { + try (PreparedStatement statement = DB_CONNECTION.prepareStatement(sql)) { + statement.execute(); } } } diff --git a/src/test/java/com/starrocks/connector/flink/it/catalog/StarRocksCatalogTest.java b/src/test/java/com/starrocks/connector/flink/it/catalog/StarRocksCatalogTest.java new file mode 100644 index 00000000..9dcfb5c3 --- /dev/null +++ b/src/test/java/com/starrocks/connector/flink/it/catalog/StarRocksCatalogTest.java @@ -0,0 +1,386 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.it.catalog; + +import com.starrocks.connector.flink.catalog.StarRocksCatalog; +import com.starrocks.connector.flink.it.StarRocksITTestBase; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.TableColumn; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; +import org.apache.flink.table.catalog.CatalogTableImpl; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.CollectionUtil; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +import static com.starrocks.connector.flink.catalog.TypeUtils.MAX_VARCHAR_SIZE; +import static com.starrocks.connector.flink.catalog.TypeUtils.STRING_SIZE; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class StarRocksCatalogTest extends StarRocksITTestBase { + + @Test + public void testDatabase() throws Exception { + StarRocksCatalog catalog = createCatalog("test_db_catalog"); + String db1 = "sr_test_" + genRandomUuid(); + String db2 = "sr_test_" + genRandomUuid(); + String db3 = "sr_test_" + genRandomUuid(); + createDatabase(db1); + createDatabase(db2); + + List expectDbList = listDatabase(); + expectDbList.sort(String::compareTo); + List actualDbList = catalog.listDatabases(); + actualDbList.sort(String::compareTo); + assertEquals(expectDbList, actualDbList); + + assertNotNull(catalog.getDatabase(db1)); + assertNotNull(catalog.getDatabase(db2)); + try { + catalog.getDatabase(db3); + fail("Should fail because the db does not exist"); + } catch (Exception e) { + assertTrue(e instanceof DatabaseNotExistException); + } + + assertTrue(catalog.databaseExists(db1)); + assertTrue(catalog.databaseExists(db2)); + assertFalse(catalog.databaseExists(db3)); + + DATABASE_SET_TO_CLEAN.add(db3); + catalog.createDatabase(db3, new CatalogDatabaseImpl(Collections.emptyMap(), null), false); + assertTrue(catalog.databaseExists(db3)); + // test ignoreIfExists is true for already existed db + catalog.createDatabase(db3, new CatalogDatabaseImpl(Collections.emptyMap(), null), true); + try { + catalog.createDatabase(db3, new CatalogDatabaseImpl(Collections.emptyMap(), null), false); + fail("Should fail because the db already exists"); + } catch (Exception e) { + assertTrue(e instanceof DatabaseAlreadyExistException); + } + } + + @Test + public void testListAndExistTable() throws Exception { + StarRocksCatalog catalog = createCatalog("test_table_catalog"); + + String db = "sr_test_" + genRandomUuid(); + DATABASE_SET_TO_CLEAN.add(db); + createDatabase(db); + assertTrue(catalog.databaseExists(db)); + + String tbl1 = createAllTypesTable(db, "tbl1"); + String tbl2 = createAllTypesTable(db, "tbl2"); + String tbl3 = "tbl3_" + genRandomUuid(); + + List expectTables = Arrays.asList(tbl1, tbl2); + expectTables.sort(String::compareTo); + List actualTables = catalog.listTables(db); + actualTables.sort(String::compareTo); + assertEquals(expectTables, actualTables); + String dbNotExist = "sr_test_" + genRandomUuid(); + try { + catalog.listTables(dbNotExist); + fail("Should failed because the db doest not exist"); + } catch (Exception e) { + assertTrue(e instanceof DatabaseNotExistException); + } + + assertTrue(catalog.tableExists(new ObjectPath(db, tbl1))); + assertTrue(catalog.tableExists(new ObjectPath(db, tbl2))); + assertFalse(catalog.tableExists(new ObjectPath(db, tbl3))); + assertFalse(catalog.tableExists(new ObjectPath(dbNotExist, tbl1))); + } + + @Test + public void testGetTable() throws Exception { + StarRocksCatalog catalog = createCatalog("test_table_catalog"); + + String db = "sr_test_" + genRandomUuid(); + DATABASE_SET_TO_CLEAN.add(db); + catalog.createDatabase(db, new CatalogDatabaseImpl(Collections.emptyMap(), null), true); + assertTrue(catalog.databaseExists(db)); + + String tbl = createAllTypesTable(db, "tbl"); + Schema flinkSchema = createAllTypesFlinkSchema().toSchema(); + + assertTrue(catalog.tableExists(new ObjectPath(db, tbl))); + CatalogBaseTable table = catalog.getTable(new ObjectPath(db, tbl)); + assertNotNull(table); + + Schema.Builder schemaBuilder = Schema.newBuilder(); + for (int i = 0; i < 11; i++) { + Schema.UnresolvedPhysicalColumn column = (Schema.UnresolvedPhysicalColumn) flinkSchema.getColumns().get(i); + schemaBuilder.column(column.getName(), column.getDataType()); + } + schemaBuilder.column("c11", DataTypes.VARCHAR(STRING_SIZE)); + for (int i = 12; i < 15; i++) { + Schema.UnresolvedPhysicalColumn column = (Schema.UnresolvedPhysicalColumn) flinkSchema.getColumns().get(i); + schemaBuilder.column(column.getName(), column.getDataType()); + } + schemaBuilder.primaryKey(flinkSchema.getPrimaryKey().get().getColumnNames()); + Schema expectFlinkSchema = schemaBuilder.build(); + Schema actualFlinkSchema = table.getUnresolvedSchema(); + + assertEquals(expectFlinkSchema.getColumns(), actualFlinkSchema.getColumns()); + assertTrue(actualFlinkSchema.getPrimaryKey().isPresent()); + assertEquals(expectFlinkSchema.getPrimaryKey().get().getColumnNames(), + actualFlinkSchema.getPrimaryKey().get().getColumnNames()); + + String tblNotExist = "tbl_" + genRandomUuid(); + try { + catalog.getTable(new ObjectPath(db, tblNotExist)); + fail("Should fail because the table does not exist"); + } catch (Exception e) { + assertTrue(e instanceof TableNotExistException); + } + } + + @Test + public void testCreateTable() throws Exception { + Configuration tableConf = new Configuration(); + tableConf.setString("table.properties.replication_num", "1"); + StarRocksCatalog catalog = createCatalog("test_table_catalog", tableConf); + + String db = "sr_test_" + genRandomUuid(); + DATABASE_SET_TO_CLEAN.add(db); + catalog.createDatabase(db, new CatalogDatabaseImpl(Collections.emptyMap(), null), true); + assertTrue(catalog.databaseExists(db)); + String tbl = "tbl_" + genRandomUuid(); + ObjectPath objectPath = new ObjectPath(db, tbl); + TableSchema flinkSchema = createAllTypesFlinkSchema(); + CatalogBaseTable catalogTable = new CatalogTableImpl(flinkSchema, Collections.emptyMap(), null); + assertFalse(catalog.tableExists(objectPath)); + catalog.createTable(objectPath, catalogTable, false); + assertTrue(catalog.tableExists(objectPath)); + CatalogBaseTable actualCatalogTable = catalog.getTable(objectPath); + List tableColumns = new ArrayList<>(flinkSchema.getTableColumns()); + tableColumns.set(5, TableColumn.physical(tableColumns.get(5).getName(), DataTypes.VARCHAR(MAX_VARCHAR_SIZE))); + tableColumns.set(11, TableColumn.physical(tableColumns.get(11).getName(), DataTypes.VARCHAR(MAX_VARCHAR_SIZE))); + tableColumns.set(14, TableColumn.physical(tableColumns.get(14).getName(), DataTypes.VARCHAR(MAX_VARCHAR_SIZE))); + TableSchema.Builder schemaBuilder = new TableSchema.Builder(); + tableColumns.forEach(schemaBuilder::add); + schemaBuilder.primaryKey(flinkSchema.getPrimaryKey().get().getColumns().toArray(new String[0])); + Schema expectFlinkSchema = schemaBuilder.build().toSchema(); + Schema actualFlinkSchema = actualCatalogTable.getUnresolvedSchema(); + assertEquals(expectFlinkSchema.getColumns(), actualFlinkSchema.getColumns()); + assertTrue(actualFlinkSchema.getPrimaryKey().isPresent()); + assertEquals(expectFlinkSchema.getPrimaryKey().get().getColumnNames(), + actualFlinkSchema.getPrimaryKey().get().getColumnNames()); + + try { + catalog.createTable(objectPath, catalogTable, false); + fail("Should fail because the table exists"); + } catch (Exception e) { + assertTrue(e instanceof TableAlreadyExistException); + } + + String dbNotExist = "sr_test_" + genRandomUuid(); + ObjectPath objectPathNotExist = new ObjectPath(dbNotExist, tbl); + try { + catalog.createTable(objectPathNotExist, catalogTable, false); + fail("Should fail because the database does not exist"); + } catch (Exception e) { + assertTrue(e instanceof DatabaseNotExistException); + } + } + + @Test + public void testDropTable() throws Exception { + StarRocksCatalog catalog = createCatalog("test_table_catalog"); + + String db = "sr_test_" + genRandomUuid(); + DATABASE_SET_TO_CLEAN.add(db); + catalog.createDatabase(db, new CatalogDatabaseImpl(Collections.emptyMap(), null), true); + assertTrue(catalog.databaseExists(db)); + String tbl = createAllTypesTable(db, "tbl"); + + ObjectPath objectPath = new ObjectPath(db, tbl); + assertTrue(catalog.tableExists(objectPath)); + catalog.dropTable(objectPath, false); + assertFalse(catalog.tableExists(objectPath)); + // test ignoreIfNotExists + catalog.dropTable(objectPath, true); + try { + catalog.dropTable(objectPath, false); + fail("Should fail because the table does not exist"); + } catch (Exception e) { + assertTrue(e instanceof TableNotExistException); + } + } + + private String createAllTypesTable(String database, String tablePrefix) throws Exception { + String tableName = tablePrefix + "_" + genRandomUuid(); + String createSql = String.format( + "CREATE TABLE `%s`.`%s` (" + + "c0 BOOLEAN," + + "c1 TINYINT," + + "c2 SMALLINT," + + "c3 INT," + + "c4 BIGINT," + + "c5 LARGEINT," + + "c6 FLOAT," + + "c7 DOUBLE," + + "c8 DECIMAL(23,5)," + + "c9 CHAR(7)," + + "c10 VARCHAR(109)," + + "c11 STRING," + + "c12 DATE," + + "c13 DATETIME," + + "c14 JSON" + + ") ENGINE = OLAP " + + "PRIMARY KEY(c0,c1,c2,c3) " + + "DISTRIBUTED BY HASH (c0) BUCKETS 8 " + + "PROPERTIES (" + + "\"replication_num\" = \"1\"" + + ")", + database, tableName); + executeSrSQL(createSql); + return tableName; + } + + private TableSchema createAllTypesFlinkSchema() { + return new TableSchema.Builder() + .field("c0", DataTypes.BOOLEAN().notNull()) + .field("c1", DataTypes.TINYINT().notNull()) + .field("c2", DataTypes.SMALLINT().notNull()) + .field("c3", DataTypes.INT().notNull()) + .field("c4", DataTypes.BIGINT()) + .field("c5", DataTypes.STRING()) + .field("c6", DataTypes.FLOAT()) + .field("c7", DataTypes.DOUBLE()) + .field("c8", DataTypes.DECIMAL(23, 5)) + .field("c9", DataTypes.CHAR(7)) + .field("c10", DataTypes.VARCHAR(109)) + .field("c11", DataTypes.STRING()) + .field("c12", DataTypes.DATE()) + .field("c13", DataTypes.TIMESTAMP(0)) + .field("c14", DataTypes.STRING()) + .primaryKey("c0", "c1", "c2", "c3") + .build(); + } + + private StarRocksCatalog createCatalog(String catalogName) { + return createCatalog(catalogName, new Configuration()); + } + + private StarRocksCatalog createCatalog(String catalogName, Configuration tableConf) { + return new StarRocksCatalog( + catalogName, + getJdbcUrl(), + getHttpUrls(), + "root", + "", + DB_NAME, + new Configuration(), + new Configuration(), + tableConf, + Thread.currentThread().getContextClassLoader() + ); + } + + @Test + public void testInsertAndSelect() throws Exception { + String tableName = createPkTable("test_read_write"); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + env.setParallelism(1); + + String catalogName = "test_catalog"; + String createCatalogSql = String.format( + "CREATE CATALOG %s WITH (" + + " 'type' = 'starrocks',\n" + + " 'jdbc-url' = '%s',\n" + + " 'http-url' = '%s',\n" + + " 'username' = '%s',\n" + + " 'password' = '%s',\n" + + " 'default-database' = '%s'" + + ");", + catalogName, getJdbcUrl(), getHttpUrls(), "root", "", "default-db"); + tEnv.executeSql(createCatalogSql); + tEnv.executeSql(String.format("USE CATALOG %s;", catalogName)); + tEnv.executeSql(String.format("USE %s;", DB_NAME)); + tEnv.executeSql( + String.format( + "INSERT INTO %s VALUES (1, '100'), (2, '200');", + tableName + )) + .await(); + + List results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery( + String.format( + "SELECT * FROM %s", + tableName)) + .execute() + .collect()); + results.sort(Comparator.comparingInt(row -> (int) row.getField(0))); + + List expectRows = Arrays.asList( + Row.ofKind(RowKind.INSERT, 1, "100"), + Row.ofKind(RowKind.INSERT, 2, "200") + ); + + assertThat(results).isEqualTo(expectRows); + } + + private String createPkTable(String tablePrefix) throws Exception { + String tableName = tablePrefix + "_" + genRandomUuid(); + String createStarRocksTable = + String.format( + "CREATE TABLE `%s`.`%s` (" + + "c0 INT," + + "c1 STRING" + + ") ENGINE = OLAP " + + "PRIMARY KEY(c0) " + + "DISTRIBUTED BY HASH (c0) BUCKETS 8 " + + "PROPERTIES (" + + "\"replication_num\" = \"1\"" + + ")", + DB_NAME, tableName); + executeSrSQL(createStarRocksTable); + return tableName; + } +} diff --git a/src/test/java/com/starrocks/connector/flink/it/StarRocksITTest.java b/src/test/java/com/starrocks/connector/flink/it/container/StarRocksITTest.java similarity index 99% rename from src/test/java/com/starrocks/connector/flink/it/StarRocksITTest.java rename to src/test/java/com/starrocks/connector/flink/it/container/StarRocksITTest.java index 3af07f6a..499ba158 100644 --- a/src/test/java/com/starrocks/connector/flink/it/StarRocksITTest.java +++ b/src/test/java/com/starrocks/connector/flink/it/container/StarRocksITTest.java @@ -1,4 +1,6 @@ /* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,7 +18,7 @@ * limitations under the License. */ -package com.starrocks.connector.flink.it; +package com.starrocks.connector.flink.it.container; import com.starrocks.connector.flink.StarRocksSink; import com.starrocks.connector.flink.StarRocksSource; diff --git a/src/test/java/com/starrocks/connector/flink/it/container/StarRocksITTestBase.java b/src/test/java/com/starrocks/connector/flink/it/container/StarRocksITTestBase.java new file mode 100644 index 00000000..f787a13b --- /dev/null +++ b/src/test/java/com/starrocks/connector/flink/it/container/StarRocksITTestBase.java @@ -0,0 +1,163 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.it.container; + +import com.esotericsoftware.minlog.Log; +import com.starrocks.connector.flink.container.StarRocksCluster; +import org.apache.flink.types.Row; +import org.apache.flink.util.TestLogger; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.function.Function; + +import static org.junit.Assert.assertEquals; + +/** Abstract IT case class for StarRocks. */ +public abstract class StarRocksITTestBase extends TestLogger { + + private static final Logger LOG = LoggerFactory.getLogger(StarRocksITTestBase.class); + + public static StarRocksCluster STARROCKS_CLUSTER = + new StarRocksCluster(1, 0, 3); + + protected static Connection DB_CONNECTION; + + @BeforeClass + public static void setUp() throws Exception { + try { + STARROCKS_CLUSTER.start(); + LOG.info("StarRocks cluster is started."); + } catch (Exception e) { + LOG.error("Failed to star StarRocks cluster.", e); + throw e; + } + Log.info("StarRocks cluster try to connect."); + try { + DB_CONNECTION = DriverManager.getConnection(getJdbcUrl(), "root", ""); + LOG.info("Success to create db connection via jdbc {}", getJdbcUrl()); + } catch (Exception e) { + LOG.error("Failed to create db connection via jdbc {}", getJdbcUrl(), e); + throw e; + } + } + + @AfterClass + public static void tearDown() throws Exception { + if (DB_CONNECTION != null) { + DB_CONNECTION.close(); + LOG.info("Close db connection"); + } + + if (STARROCKS_CLUSTER != null) { + STARROCKS_CLUSTER.stop(); + STARROCKS_CLUSTER.close(); + Log.info("Stop and close StarRocks cluster."); + } + } + + protected static String getJdbcUrl() { + return "jdbc:mysql://" + STARROCKS_CLUSTER.getQueryUrls(); + } + + /** + * Query the data in the table, and compare the result with the expected data. + * + *

For example, we can call this method as follows: + * verifyResult( + * new Row[] { + * Row.of("1", 1.4, Timestamp.valueOf("2022-09-06 20:00:00.000")), + * Row.of("2", 2.4, Timestamp.valueOf("2022-09-06 20:00:00.000")), + * }, + * "select * from db.table" + * ); + */ + public static void verifyResult(Row[] expectedData, String sql) throws Exception { + ArrayList resultData = getSQLResult(sql); + LOG.info("resultData: {}",resultData); + LOG.info("expectedData: {}",Arrays.asList(expectedData)); + assertEquals(expectedData.length, resultData.size()); + for (int i = 0; i < expectedData.length; i++) { + assertEquals(expectedData[i].toString(), resultData.get(i).toString()); + } + } + + public static ArrayList getSQLResult(String sql) throws Exception { + if (!DB_CONNECTION.isValid(3000)) { + reestablishConnection(); + } + ArrayList resultData = new ArrayList<>(); + try (PreparedStatement statement = DB_CONNECTION.prepareStatement(sql); + ResultSet resultSet = statement.executeQuery()) { + int columnCount = resultSet.getMetaData().getColumnCount(); + while (resultSet.next()) { + Row row = new Row(columnCount); + for (int i = 0; i < columnCount; i++) { + row.setField(i, resultSet.getObject(i + 1)); + } + resultData.add(row); + } + } + return resultData; + } + + private static void reestablishConnection() throws Exception{ + try { + DB_CONNECTION = DriverManager.getConnection(getJdbcUrl(), "root", ""); + LOG.info("Success to create db connection via jdbc {}", getJdbcUrl()); + } catch (Exception e) { + LOG.error("Failed to create db connection via jdbc {}", getJdbcUrl(), e); + throw e; + } + } + + public static void verifyResult(Function verifyFunction, String sql) throws Exception { + ArrayList resultData = new ArrayList<>(); + if (!DB_CONNECTION.isValid(3000)) { + reestablishConnection(); + } + try (PreparedStatement statement = DB_CONNECTION.prepareStatement(sql); + ResultSet resultSet = statement.executeQuery()) { + int columnCount = resultSet.getMetaData().getColumnCount(); + while (resultSet.next()) { + Row row = new Row(columnCount); + for (int i = 0; i < columnCount; i++) { + row.setField(i, resultSet.getObject(i + 1)); + } + resultData.add(row); + } + } + if (!verifyFunction.apply(resultData.toArray(new Row[0]))) + { + LOG.info("verifyResult is : {}", resultData.toString()); + Assert.fail(); + } + } +} diff --git a/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java b/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java index ff40e11f..661c53e6 100644 --- a/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java +++ b/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java @@ -18,6 +18,7 @@ package com.starrocks.connector.flink.it.sink; +import com.starrocks.connector.flink.it.StarRocksITTestBase; import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; @@ -50,7 +51,7 @@ import static org.junit.Assume.assumeTrue; @RunWith(Parameterized.class) -public class StarRocksSinkITTest extends StarRocksSinkITTestBase { +public class StarRocksSinkITTest extends StarRocksITTestBase { @Parameterized.Parameters(name = "sinkV2={0}") public static List parameters() { diff --git a/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTestBase.java b/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTestBase.java deleted file mode 100644 index 652723dc..00000000 --- a/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTestBase.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.starrocks.connector.flink.it.sink; - -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.rules.TemporaryFolder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.util.UUID; - -import static org.junit.Assume.assumeTrue; - -public abstract class StarRocksSinkITTestBase { - - private static final Logger LOG = LoggerFactory.getLogger(StarRocksSinkITTestBase.class); - - private static final boolean DEBUG_MODE = false; - - @Rule - public TemporaryFolder temporaryFolder = new TemporaryFolder(); - - protected static String DB_NAME; - protected static String HTTP_URLS; - protected static String JDBC_URLS; - - protected static String getHttpUrls() { - return HTTP_URLS; - } - - protected static String getJdbcUrl() { - return JDBC_URLS; - } - - protected static Connection DB_CONNECTION; - @BeforeClass - public static void setUp() throws Exception { - HTTP_URLS = DEBUG_MODE ? "127.0.0.1:11901" : System.getProperty("http_urls"); - JDBC_URLS = DEBUG_MODE ? "jdbc:mysql://127.0.0.1:11903" : System.getProperty("jdbc_urls"); - assumeTrue(HTTP_URLS != null && JDBC_URLS != null); - - DB_NAME = "sr_sink_test_" + genRandomUuid(); - try { - DB_CONNECTION = DriverManager.getConnection(getJdbcUrl(), "root", ""); - LOG.info("Success to create db connection via jdbc {}", getJdbcUrl()); - } catch (Exception e) { - LOG.error("Failed to create db connection via jdbc {}", getJdbcUrl(), e); - throw e; - } - - try { - String createDb = "CREATE DATABASE " + DB_NAME; - executeSrSQL(createDb); - LOG.info("Successful to create database {}", DB_NAME); - } catch (Exception e) { - LOG.error("Failed to create database {}", DB_NAME, e); - throw e; - } - } - - @AfterClass - public static void tearDown() throws Exception { - if (DB_CONNECTION != null) { - try { - String dropDb = String.format("DROP DATABASE IF EXISTS %s FORCE", DB_NAME); - executeSrSQL(dropDb); - LOG.info("Successful to drop database {}", DB_NAME); - } catch (Exception e) { - LOG.error("Failed to drop database {}", DB_NAME, e); - } - DB_CONNECTION.close(); - } - } - - protected static String genRandomUuid() { - return UUID.randomUUID().toString().replace("-", "_"); - } - - protected static void executeSrSQL(String sql) throws Exception { - try (PreparedStatement statement = DB_CONNECTION.prepareStatement(sql)) { - statement.execute(); - } - } -} diff --git a/src/test/java/com/starrocks/connector/flink/it/source/StarRocksSourceBaseTest.java b/src/test/java/com/starrocks/connector/flink/it/source/StarRocksSourceBaseTest.java index 924194ba..f7063564 100644 --- a/src/test/java/com/starrocks/connector/flink/it/source/StarRocksSourceBaseTest.java +++ b/src/test/java/com/starrocks/connector/flink/it/source/StarRocksSourceBaseTest.java @@ -14,26 +14,10 @@ package com.starrocks.connector.flink.it.source; -import static org.junit.Assert.assertEquals; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.PrintWriter; -import java.net.ServerSocket; -import java.net.Socket; -import java.util.HashMap; -import java.util.Map; - import com.alibaba.fastjson.JSONObject; import com.starrocks.connector.flink.table.source.StarRocksSourceOptions; import com.starrocks.connector.flink.table.source.StarrocksExternalServiceImpl; import com.starrocks.connector.flink.table.source.struct.SelectColumn; -import com.starrocks.thrift.TStarrocksExternalService; - -import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.api.TableSchema; import com.starrocks.shade.org.apache.thrift.protocol.TBinaryProtocol; import com.starrocks.shade.org.apache.thrift.server.TServer; import com.starrocks.shade.org.apache.thrift.server.TThreadPoolServer; @@ -41,9 +25,24 @@ import com.starrocks.shade.org.apache.thrift.transport.TServerTransport; import com.starrocks.shade.org.apache.thrift.transport.TTransportException; import com.starrocks.shade.org.apache.thrift.transport.TTransportFactory; +import com.starrocks.thrift.TStarrocksExternalService; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableSchema; import org.junit.After; import org.junit.Before; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + public abstract class StarRocksSourceBaseTest { @@ -218,7 +217,7 @@ protected void mockResonsefunc() { Map respMap = new HashMap<>(); respMap.put("opaqued_query_plan", "mockPlan"); respMap.put("status", "200"); - Map partitionsSet = new HashMap<>(); + Map partitionsSet = new HashMap<>(); for (int i = 0; i < tabletCount; i ++) { Map pMap = new HashMap<>(); pMap.put("version", 4); @@ -228,7 +227,7 @@ protected void mockResonsefunc() { beNode[i%beNode.length], beNode[i%beNode.length + 1 >= beNode.length ? 0 : i%beNode.length + 1] }); - partitionsSet.put(i, pMap); + partitionsSet.put(String.valueOf(i), pMap); } respMap.put("partitions", partitionsSet); JSONObject json = new JSONObject(respMap); @@ -241,7 +240,7 @@ protected void mockOneBeResonsefunc() { Map respMap = new HashMap<>(); respMap.put("opaqued_query_plan", "mockPlan"); respMap.put("status", "200"); - Map partitionsSet = new HashMap<>(); + Map partitionsSet = new HashMap<>(); for (int i = 0; i < tabletCount; i ++) { Map pMap = new HashMap<>(); pMap.put("version", 4); @@ -251,7 +250,7 @@ protected void mockOneBeResonsefunc() { beNode[i%beNode.length], beNode[i%beNode.length + 1 >= beNode.length ? 0 : i%beNode.length + 1] }); - partitionsSet.put(i, pMap); + partitionsSet.put(String.valueOf(i), pMap); } respMap.put("partitions", partitionsSet); JSONObject json = new JSONObject(respMap);