From d4733fff32c8f92e8ae6afc9a474f9608f784e57 Mon Sep 17 00:00:00 2001 From: PengFei Li Date: Mon, 27 Nov 2023 15:18:07 +0800 Subject: [PATCH] [Refactor] Abstract StarRocksCatalog to be used in Flink CDC 3.0 Signed-off-by: PengFei Li --- .../connector/flink/catalog/FlinkCatalog.java | 615 ++++++++++++ .../flink/catalog/StarRocksCatalog.java | 935 +++++++++--------- .../catalog/StarRocksCatalogException.java | 43 + .../catalog/StarRocksCatalogFactory.java | 2 +- .../flink/catalog/StarRocksColumn.java | 204 ++-- .../flink/catalog/StarRocksSchema.java | 71 -- .../flink/catalog/StarRocksTable.java | 234 +++-- .../flink/catalog/StarRocksUtils.java | 197 +--- .../connector/flink/catalog/TypeUtils.java | 54 +- ...Test.java => FlinkCatalogFactoryTest.java} | 24 +- .../flink/it/StarRocksITTestBase.java | 6 +- .../flink/it/catalog/FlinkCatalogTest.java | 386 ++++++++ .../it/catalog/StarRocksCatalogTest.java | 416 ++------ 13 files changed, 1915 insertions(+), 1272 deletions(-) create mode 100644 src/main/java/com/starrocks/connector/flink/catalog/FlinkCatalog.java create mode 100644 src/main/java/com/starrocks/connector/flink/catalog/StarRocksCatalogException.java delete mode 100644 src/main/java/com/starrocks/connector/flink/catalog/StarRocksSchema.java rename src/test/java/com/starrocks/connector/flink/catalog/{StarRocksCatalogFactoryTest.java => FlinkCatalogFactoryTest.java} (76%) create mode 100644 src/test/java/com/starrocks/connector/flink/it/catalog/FlinkCatalogTest.java diff --git a/src/main/java/com/starrocks/connector/flink/catalog/FlinkCatalog.java b/src/main/java/com/starrocks/connector/flink/catalog/FlinkCatalog.java new file mode 100644 index 00000000..0f11016e --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/catalog/FlinkCatalog.java @@ -0,0 +1,615 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.catalog; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.exceptions.TablePartitionedException; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.factories.Factory; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; +import org.apache.flink.util.TemporaryClassLoaderContext; + +import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions; +import com.starrocks.connector.flink.table.source.StarRocksSourceOptions; +import org.apache.commons.compress.utils.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static com.starrocks.connector.flink.catalog.JdbcUtils.verifyJdbcDriver; +import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; + +/** Flink catalog for StarRocks. */ +public class FlinkCatalog extends AbstractCatalog { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkCatalog.class); + + private final String jdbcUrl; + private final String httpUrl; + private final String username; + private final String password; + private final Configuration sourceBaseConfig; + private final Configuration sinkBaseConfig; + private final Configuration tableBaseConfig; + private final ClassLoader userClassLoader; + private final StarRocksCatalog starRocksCatalog; + + public FlinkCatalog( + String name, + String jdbcUrl, + String httpUrl, + String username, + String password, + String defaultDatabase, + Configuration sourceBaseConfig, + Configuration sinkBaseConfig, + Configuration tableBaseConfig, + ClassLoader userClassLoader) { + super(name, defaultDatabase); + this.jdbcUrl = jdbcUrl; + this.httpUrl = httpUrl; + this.username = username; + this.password = password; + this.sourceBaseConfig = sourceBaseConfig; + this.sinkBaseConfig = sinkBaseConfig; + this.tableBaseConfig = tableBaseConfig; + this.userClassLoader = userClassLoader; + this.starRocksCatalog = new StarRocksCatalog(jdbcUrl, username, password); + } + + @Override + public Optional getFactory() { + return Optional.of(new StarRocksDynamicTableFactory()); + } + + @Override + public void open() throws CatalogException { + // Refer to flink-connector-jdbc's AbstractJdbcCatalog + // load the Driver use userClassLoader explicitly, see FLINK-15635 for more detail + try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(userClassLoader)) { + verifyJdbcDriver(); + // test connection, fail early if we cannot connect to database + try (Connection conn = getConnection()) { + } catch (SQLException e) { + throw new ValidationException( + String.format("Failed to connect StarRocks via JDBC: %s.", jdbcUrl), e); + } + } + LOG.info("Open flink catalog for StarRocks {}", getName()); + } + + @Override + public void close() throws CatalogException { + LOG.info("Close flink catalog for StarRocks {}", getName()); + } + + // ------ databases ------ + + @Override + public List listDatabases() throws CatalogException { + return executeSingleColumnStatement( + "SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA`;"); + } + + @Override + public CatalogDatabase getDatabase(String databaseName) + throws DatabaseNotExistException, CatalogException { + Preconditions.checkArgument( + !StringUtils.isNullOrWhitespaceOnly(databaseName), + "database name cannot be null or empty."); + if (databaseExists(databaseName)) { + return new CatalogDatabaseImpl(Collections.emptyMap(), null); + } else { + throw new DatabaseNotExistException(getName(), databaseName); + } + } + + @Override + public boolean databaseExists(String databaseName) throws CatalogException { + Preconditions.checkArgument( + !StringUtils.isNullOrWhitespaceOnly(databaseName), + "database name cannot be null or empty."); + List dbList = executeSingleColumnStatement( + "SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA` WHERE SCHEMA_NAME = ?;", + databaseName); + return !dbList.isEmpty(); + } + + @Override + public void createDatabase(String databaseName, CatalogDatabase database, boolean ignoreIfExists) + throws DatabaseAlreadyExistException, CatalogException { + Preconditions.checkArgument( + !StringUtils.isNullOrWhitespaceOnly(databaseName), + "database name cannot be null or empty."); + if (databaseExists(databaseName)) { + if (ignoreIfExists) { + return; + } + throw new DatabaseAlreadyExistException(getName(), databaseName); + } + + try { + starRocksCatalog.createDatabase(databaseName, ignoreIfExists); + } catch (StarRocksCatalogException e) { + throw new CatalogException( + String.format("Failed to create database %s, ignoreIfExists: %s", + databaseName, ignoreIfExists), + e); + } + } + + @Override + public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) + throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) + throws DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + // ------ tables ------ + + @Override + public List listTables(String databaseName) throws DatabaseNotExistException, CatalogException { + Preconditions.checkArgument( + !StringUtils.isNullOrWhitespaceOnly(databaseName), + "database name cannot be null or empty."); + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(getName(), databaseName); + } + + return executeSingleColumnStatement( + "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = ?", + databaseName); + } + + @Override + public boolean tableExists(ObjectPath tablePath) throws CatalogException { + List tableList = executeSingleColumnStatement( + "SELECT TABLE_NAME FROM information_schema.`TABLES` " + + "WHERE TABLE_SCHEMA=? and TABLE_NAME=?", + tablePath.getDatabaseName(), + tablePath.getObjectName() + ); + return !tableList.isEmpty(); + } + + @Override + public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException { + if (!tableExists(tablePath)) { + throw new TableNotExistException(getName(), tablePath); + } + + try { + StarRocksTable starRocksTable = starRocksCatalog.getTable( + tablePath.getDatabaseName(), tablePath.getObjectName()).orElse(null); + if (starRocksTable == null) { + throw new TableNotExistException(getName(), tablePath); + } + Schema.Builder flinkSchemaBuilder = Schema.newBuilder(); + Set primaryKeys = new HashSet<>(); + if (starRocksTable.getTableType() == StarRocksTable.TableType.PRIMARY_KEY && + starRocksTable.getTableKeys().isPresent()) { + flinkSchemaBuilder.primaryKey(starRocksTable.getTableKeys().get()); + primaryKeys.addAll(starRocksTable.getTableKeys().get()); + } + for (StarRocksColumn column : starRocksTable.getColumns()) { + flinkSchemaBuilder.column( + column.getColumnName(), + TypeUtils.toFlinkType( + column.getDataType(), + column.getColumnSize().orElse(null), + column.getDecimalDigits().orElse(null), + column.isNullable() + ) + ); + } + Schema flinkSchema = flinkSchemaBuilder.build(); + Map properties = new HashMap<>(); + properties.put(CONNECTOR.key(), CatalogOptions.IDENTIFIER); + properties.putAll(getSourceConfig(tablePath.getDatabaseName(), tablePath.getObjectName())); + properties.putAll(getSinkConfig(tablePath.getDatabaseName(), tablePath.getObjectName())); + + return CatalogTable.of(flinkSchema, starRocksTable.getComment().orElse(null), Lists.newArrayList(), properties); + } catch (StarRocksCatalogException e) { + throw new CatalogException( + String.format("Failed to get table %s in catalog %s", tablePath.getFullName(), getName()), e); + } + } + + @Override + public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + if (!databaseExists(tablePath.getDatabaseName())) { + throw new DatabaseNotExistException(getName(), tablePath.getDatabaseName()); + } + + if (tableExists(tablePath)) { + if (ignoreIfExists) { + return; + } + throw new TableAlreadyExistException(getName(), tablePath); + } + + StarRocksTable starRocksTable = StarRocksUtils.toStarRocksTable(getName(), tablePath, tableBaseConfig, table); + try { + starRocksCatalog.createTable(starRocksTable, ignoreIfExists); + LOG.info("Success to create table {} in catalog {}", tablePath.getFullName(), getName()); + } catch (StarRocksCatalogException e) { + LOG.error("Failed to create table {} in catalog {}", tablePath.getFullName(), getName(), e); + throw new CatalogException( + String.format("Failed to create table %s in catalog %s", + tablePath.getFullName(), getName()), e); + } + } + + @Override + public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + if (!tableExists(tablePath)) { + if (ignoreIfNotExists) { + return; + } + throw new TableNotExistException(getName(), tablePath); + } + + try { + String dropSql = String.format( + "DROP TABLE `%s`.`%s`;", tablePath.getDatabaseName(), tablePath.getObjectName()); + executeUpdateStatement(dropSql); + } catch (Exception e) { + throw new CatalogException(String.format("Failed to drop table %s in catalog %s", + tablePath.getFullName(), getName()), e); + } + } + + @Override + public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + // TODO Flink supports to add/drop column since 1.17. Implement it in the future if needed. + throw new UnsupportedOperationException(); + } + + @Override + public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) + throws TableNotExistException, TableAlreadyExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + private Map getSourceConfig(String database, String table) { + Map sourceConfig = new HashMap<>(sourceBaseConfig.toMap()); + setIfNotExist(sourceConfig, StarRocksSourceOptions.JDBC_URL, jdbcUrl); + setIfNotExist(sourceConfig, StarRocksSourceOptions.SCAN_URL, httpUrl); + setIfNotExist(sourceConfig, StarRocksSourceOptions.USERNAME, username); + setIfNotExist(sourceConfig, StarRocksSourceOptions.PASSWORD, password); + sourceConfig.put(StarRocksSourceOptions.DATABASE_NAME.key(), database); + sourceConfig.put(StarRocksSourceOptions.TABLE_NAME.key(), table); + return sourceConfig; + } + + private Map getSinkConfig(String database, String table) { + Map sinkConfig = new HashMap<>(sinkBaseConfig.toMap()); + setIfNotExist(sinkConfig, StarRocksSinkOptions.JDBC_URL, jdbcUrl); + setIfNotExist(sinkConfig, StarRocksSinkOptions.LOAD_URL, httpUrl); + setIfNotExist(sinkConfig, StarRocksSinkOptions.USERNAME, username); + setIfNotExist(sinkConfig, StarRocksSinkOptions.PASSWORD, password); + sinkConfig.put(StarRocksSinkOptions.DATABASE_NAME.key(), database); + sinkConfig.put(StarRocksSinkOptions.TABLE_NAME.key(), table); + if (sinkConfig.containsKey(StarRocksSinkOptions.SINK_LABEL_PREFIX.key())) { + String rawLabelPrefix = sinkConfig.get(StarRocksSinkOptions.SINK_LABEL_PREFIX.key()); + String labelPrefix = String.join("_", rawLabelPrefix, database, table); + sinkConfig.put(StarRocksSinkOptions.SINK_LABEL_PREFIX.key(), labelPrefix); + } + return sinkConfig; + } + + private void setIfNotExist(Map config, ConfigOption option, String value) { + if (!config.containsKey(option.key())) { + config.put(option.key(), value); + } + } + + private int executeUpdateStatement(String sql) throws SQLException { + try (Connection connection = getConnection(); + Statement statement = connection.createStatement()) { + return statement.executeUpdate(sql); + } + } + + private List executeSingleColumnStatement(String sql, Object... params) { + try (Connection conn = getConnection(); + PreparedStatement statement = conn.prepareStatement(sql)) { + List columnValues = Lists.newArrayList(); + if (params != null) { + for (int i = 0; i < params.length; i++) { + statement.setObject(i + 1, params[i]); + } + } + try (ResultSet rs = statement.executeQuery()) { + while (rs.next()) { + String columnValue = rs.getString(1); + columnValues.add(columnValue); + } + } + return columnValues; + } catch (Exception e) { + throw new CatalogException( + String.format("Failed to execute sql: %s", sql), e); + } + } + + private Connection getConnection() throws SQLException { + // TODO reuse the connection + return DriverManager.getConnection(jdbcUrl, username, password); + } + + // ------ views ------ + + @Override + public List listViews(String databaseName) throws DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + // ------ partitions ------ + + @Override + public List listPartitions(ObjectPath tablePath) + throws TableNotExistException, TableNotPartitionedException, CatalogException { + return Collections.emptyList(); + } + + @Override + public List listPartitions( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws TableNotExistException, TableNotPartitionedException, + PartitionSpecInvalidException, CatalogException { + return Collections.emptyList(); + } + + @Override + public List listPartitionsByFilter( + ObjectPath tablePath, List filters) + throws TableNotExistException, TableNotPartitionedException, CatalogException { + return Collections.emptyList(); + } + + @Override + public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws PartitionNotExistException, CatalogException { + throw new PartitionNotExistException(getName(), tablePath, partitionSpec); + } + + @Override + public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws CatalogException { + return false; + } + + @Override + public void createPartition( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogPartition partition, + boolean ignoreIfExists) + throws TableNotExistException, TableNotPartitionedException, + PartitionSpecInvalidException, PartitionAlreadyExistsException, + CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void dropPartition( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterPartition( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogPartition newPartition, + boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + // ------ functions ------ + + @Override + public List listFunctions(String dbName) throws DatabaseNotExistException, CatalogException { + return Collections.emptyList(); + } + + @Override + public CatalogFunction getFunction(ObjectPath functionPath) + throws FunctionNotExistException, CatalogException { + throw new FunctionNotExistException(getName(), functionPath); + } + + + @Override + public boolean functionExists(ObjectPath functionPath) throws CatalogException { + return false; + } + + @Override + public void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists) + throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterFunction( + ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists) + throws FunctionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) + throws FunctionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + // ------ statistics ------ + + @Override + public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + return CatalogTableStatistics.UNKNOWN; + } + + @Override + public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + return CatalogColumnStatistics.UNKNOWN; + } + + @Override + public CatalogTableStatistics getPartitionStatistics( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws PartitionNotExistException, CatalogException { + return CatalogTableStatistics.UNKNOWN; + } + + @Override + public CatalogColumnStatistics getPartitionColumnStatistics( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws PartitionNotExistException, CatalogException { + return CatalogColumnStatistics.UNKNOWN; + } + + @Override + public void alterTableStatistics( + ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterTableColumnStatistics( + ObjectPath tablePath, + CatalogColumnStatistics columnStatistics, + boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException, TablePartitionedException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterPartitionStatistics( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogTableStatistics partitionStatistics, + boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterPartitionColumnStatistics( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogColumnStatistics columnStatistics, + boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @VisibleForTesting + String getJdbcUrl() { + return jdbcUrl; + } + + @VisibleForTesting + String getHttpUrl() { + return httpUrl; + } + + @VisibleForTesting + String getUsername() { + return username; + } + + @VisibleForTesting + String getPassword() { + return password; + } + + @VisibleForTesting + Configuration getSourceBaseConfig() { + return sourceBaseConfig; + } + + @VisibleForTesting + Configuration getSinkBaseConfig() { + return sinkBaseConfig; + } + + @VisibleForTesting + Configuration getTableBaseConfig() { + return tableBaseConfig; + } +} diff --git a/src/main/java/com/starrocks/connector/flink/catalog/StarRocksCatalog.java b/src/main/java/com/starrocks/connector/flink/catalog/StarRocksCatalog.java index 63eaae1b..3cdbc88e 100644 --- a/src/main/java/com/starrocks/connector/flink/catalog/StarRocksCatalog.java +++ b/src/main/java/com/starrocks/connector/flink/catalog/StarRocksCatalog.java @@ -20,376 +20,393 @@ package com.starrocks.connector.flink.catalog; -import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions; -import com.starrocks.connector.flink.table.source.StarRocksSourceOptions; -import org.apache.commons.compress.utils.Lists; -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.table.api.Schema; -import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.catalog.AbstractCatalog; -import org.apache.flink.table.catalog.CatalogBaseTable; -import org.apache.flink.table.catalog.CatalogDatabase; -import org.apache.flink.table.catalog.CatalogDatabaseImpl; -import org.apache.flink.table.catalog.CatalogFunction; -import org.apache.flink.table.catalog.CatalogPartition; -import org.apache.flink.table.catalog.CatalogPartitionSpec; -import org.apache.flink.table.catalog.CatalogTable; -import org.apache.flink.table.catalog.ObjectPath; -import org.apache.flink.table.catalog.exceptions.CatalogException; -import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; -import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; -import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; -import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; -import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; -import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; -import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; -import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; -import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; -import org.apache.flink.table.catalog.exceptions.TableNotExistException; -import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; -import org.apache.flink.table.catalog.exceptions.TablePartitionedException; -import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; -import org.apache.flink.table.catalog.stats.CatalogTableStatistics; -import org.apache.flink.table.expressions.Expression; -import org.apache.flink.table.factories.Factory; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; -import org.apache.flink.util.TemporaryClassLoaderContext; + +import org.apache.commons.compress.utils.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Serializable; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; +import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.Optional; -import java.util.Set; +import java.util.stream.Collectors; -import static com.starrocks.connector.flink.catalog.JdbcUtils.verifyJdbcDriver; -import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; +/** Responsible for reading and writing metadata such as database/table from StarRocks. */ +public class StarRocksCatalog implements Serializable { -/** Flink catalog for StarRocks. */ -public class StarRocksCatalog extends AbstractCatalog { + private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(StarRocksCatalog.class); private final String jdbcUrl; - private final String httpUrl; private final String username; private final String password; - private final Configuration sourceBaseConfig; - private final Configuration sinkBaseConfig; - private final Configuration tableBaseConfig; - private final ClassLoader userClassLoader; - - public StarRocksCatalog( - String name, - String jdbcUrl, - String httpUrl, - String username, - String password, - String defaultDatabase, - Configuration sourceBaseConfig, - Configuration sinkBaseConfig, - Configuration tableBaseConfig, - ClassLoader userClassLoader) { - super(name, defaultDatabase); + + public StarRocksCatalog(String jdbcUrl, String username, String password) { this.jdbcUrl = jdbcUrl; - this.httpUrl = httpUrl; this.username = username; this.password = password; - this.sourceBaseConfig = sourceBaseConfig; - this.sinkBaseConfig = sinkBaseConfig; - this.tableBaseConfig = tableBaseConfig; - this.userClassLoader = userClassLoader; - } - - @Override - public Optional getFactory() { - return Optional.of(new StarRocksDynamicTableFactory()); - } - - @Override - public void open() throws CatalogException { - // Refer to flink-connector-jdbc's AbstractJdbcCatalog - // load the Driver use userClassLoader explicitly, see FLINK-15635 for more detail - try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(userClassLoader)) { - verifyJdbcDriver(); - // test connection, fail early if we cannot connect to database - try (Connection conn = getConnection()) { - } catch (SQLException e) { - throw new ValidationException( - String.format("Failed to connect StarRocks via JDBC: %s.", jdbcUrl), e); - } - } - LOG.info("Open StarRocks catalog {}", getName()); } - @Override - public void close() throws CatalogException { - LOG.info("Close StarRocks catalog {}", getName()); - } - - // ------ databases ------ - - @Override - public List listDatabases() throws CatalogException { - return executeSingleColumnStatement( - "SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA`;"); - } - - @Override - public CatalogDatabase getDatabase(String databaseName) - throws DatabaseNotExistException, CatalogException { + /** + * Check if a database exists in this catalog. + * + * @param databaseName Name of the database + * @return true if the given database exists in the catalog false otherwise + * @throws StarRocksCatalogException in case of any runtime exception + */ + public boolean databaseExists(String databaseName) throws StarRocksCatalogException { Preconditions.checkArgument( !StringUtils.isNullOrWhitespaceOnly(databaseName), "database name cannot be null or empty."); - if (databaseExists(databaseName)) { - return new CatalogDatabaseImpl(Collections.emptyMap(), null); - } else { - throw new DatabaseNotExistException(getName(), databaseName); - } - } - - @Override - public boolean databaseExists(String databaseName) throws CatalogException { - Preconditions.checkArgument( - !StringUtils.isNullOrWhitespaceOnly(databaseName), - "database name cannot be null or empty."); - List dbList = executeSingleColumnStatement( - "SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA` WHERE SCHEMA_NAME = ?;", + String querySql = String.format( + "SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA` WHERE SCHEMA_NAME = %s;", databaseName); - return !dbList.isEmpty(); + try { + List dbList = executeSingleColumnStatement(querySql); + return !dbList.isEmpty(); + } catch (Exception e) { + LOG.error("Failed to check database exist, database: {}, sql: {}", databaseName, querySql, e); + throw new StarRocksCatalogException( + String.format("Failed to check database exist, database: %s", databaseName), e); + } } - @Override - public void createDatabase(String databaseName, CatalogDatabase database, boolean ignoreIfExists) - throws DatabaseAlreadyExistException, CatalogException { + /** + * Create a database. + * + * @param databaseName Name of the database + * @param ignoreIfExists Flag to specify behavior when a database with the given name already + * exists: if set to false, throw a StarRocksCatalogException, if set to true, do nothing. + * @throws StarRocksCatalogException in case of any runtime exception + */ + public void createDatabase(String databaseName, boolean ignoreIfExists) + throws StarRocksCatalogException { Preconditions.checkArgument( !StringUtils.isNullOrWhitespaceOnly(databaseName), "database name cannot be null or empty."); - if (databaseExists(databaseName)) { - if (ignoreIfExists) { - return; - } - throw new DatabaseAlreadyExistException(getName(), databaseName); - } - + String sql = buildCreateDatabaseSql(databaseName, ignoreIfExists); try { - String sql = String.format("CREATE DATABASE %s%s;", - ignoreIfExists ? "IF NOT EXISTS " : "", databaseName); executeUpdateStatement(sql); - } catch (SQLException e) { - throw new CatalogException( - String.format("Failed to create database %s, ignoreIfExists: %s", + LOG.info("Successful to create database {}, sql: {}", databaseName, sql); + } catch (Exception e) { + LOG.info("Failed to create database {}, sql: {}", databaseName, sql, e); + throw new StarRocksCatalogException( + String.format( + "Failed to create database %s, ignoreIfExists: %s", databaseName, ignoreIfExists), e); } } - @Override - public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) - throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException { - throw new UnsupportedOperationException(); - } - - @Override - public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) - throws DatabaseNotExistException, CatalogException { - throw new UnsupportedOperationException(); - } - - // ------ tables ------ - - @Override - public List listTables(String databaseName) throws DatabaseNotExistException, CatalogException { + /** + * Returns a {@link StarRocksTable} identified by the given databaseName and tableName. + * + * @param databaseName Name of the database + * @param tableName Name of the table + * @return an optional of the requested table. null if the table does not exist. + * @throws StarRocksCatalogException in case of any runtime exception + */ + public Optional getTable(String databaseName, String tableName) + throws StarRocksCatalogException { Preconditions.checkArgument( !StringUtils.isNullOrWhitespaceOnly(databaseName), "database name cannot be null or empty."); - if (!databaseExists(databaseName)) { - throw new DatabaseNotExistException(getName(), databaseName); - } - - return executeSingleColumnStatement( - "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = ?", - databaseName); - } - - @Override - public boolean tableExists(ObjectPath tablePath) throws CatalogException { - List tableList = executeSingleColumnStatement( - "SELECT TABLE_NAME FROM information_schema.`TABLES` " + - "WHERE TABLE_SCHEMA=? and TABLE_NAME=?", - tablePath.getDatabaseName(), - tablePath.getObjectName() - ); - return !tableList.isEmpty(); - } + Preconditions.checkArgument( + !StringUtils.isNullOrWhitespaceOnly(tableName), + "table name cannot be null or empty."); - @Override - public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException { - if (!tableExists(tablePath)) { - throw new TableNotExistException(getName(), tablePath); - } + final String tableSchemaQuery = + "SELECT `COLUMN_NAME`, `DATA_TYPE`, `ORDINAL_POSITION`, `COLUMN_SIZE`, `DECIMAL_DIGITS`, " + + "`IS_NULLABLE`, `COLUMN_KEY`, `COLUMN_COMMENT` FROM `information_schema`.`COLUMNS` " + + "WHERE `TABLE_SCHEMA`=? AND `TABLE_NAME`=?;"; + StarRocksTable.TableType tableType = StarRocksTable.TableType.UNKNOWN; + List columns = new ArrayList<>(); + List tableKeys = new ArrayList<>(); try (Connection connection = getConnection()) { - StarRocksTable starRocksTable = StarRocksUtils.getStarRocksTable( - connection, tablePath.getDatabaseName(), tablePath.getObjectName()); - - StarRocksSchema starRocksSchema = starRocksTable.getSchema(); - Schema.Builder flinkSchemaBuilder = Schema.newBuilder(); - Set primaryKeys = new HashSet<>(); - if (starRocksTable.getTableType() == StarRocksTable.TableType.PRIMARY && - starRocksTable.getTableKeys() != null) { - flinkSchemaBuilder.primaryKey(starRocksTable.getTableKeys()); - primaryKeys.addAll(starRocksTable.getTableKeys()); - } - for (StarRocksColumn column : starRocksSchema.getColumns()) { - flinkSchemaBuilder.column( - column.getName(), - TypeUtils.toFlinkType( - column.getType(), - column.getSize(), - column.getScale(), - !primaryKeys.contains(column.getName()) - ) - ); + try (PreparedStatement statement = connection.prepareStatement(tableSchemaQuery)) { + statement.setObject(1, databaseName); + statement.setObject(2, tableName); + try (ResultSet resultSet = statement.executeQuery()) { + while (resultSet.next()) { + String name = resultSet.getString("COLUMN_NAME"); + String type = resultSet.getString("DATA_TYPE"); + int position = resultSet.getInt("ORDINAL_POSITION"); + Integer size = resultSet.getInt("COLUMN_SIZE"); + if (resultSet.wasNull()) { + size = null; + } + Integer scale = resultSet.getInt("DECIMAL_DIGITS"); + if (resultSet.wasNull()) { + scale = null; + } + String isNullable = resultSet.getString("IS_NULLABLE"); + String comment = resultSet.getString("COLUMN_COMMENT"); + StarRocksColumn column = + new StarRocksColumn.Builder() + .setColumnName(name) + .setOrdinalPosition(position - 1) + .setDataType(type) + .setColumnSize(size) + .setDecimalDigits(scale) + .setNullable( + isNullable == null + || !isNullable.equalsIgnoreCase("NO")) + .setColumnComment(comment) + .build(); + columns.add(column); + + // Only primary key table has value in this field. and the value is "PRI" + String columnKey = resultSet.getString("COLUMN_KEY"); + if (!StringUtils.isNullOrWhitespaceOnly(columnKey)) { + if (columnKey.equalsIgnoreCase("PRI") + && tableType == StarRocksTable.TableType.UNKNOWN) { + tableType = StarRocksTable.TableType.PRIMARY_KEY; + } + tableKeys.add(column.getColumnName()); + } + } + } } - - Schema flinkSchema = flinkSchemaBuilder.build(); - - Map properties = new HashMap<>(); - properties.put(CONNECTOR.key(), CatalogOptions.IDENTIFIER); - properties.putAll(getSourceConfig(tablePath.getDatabaseName(), tablePath.getObjectName())); - properties.putAll(getSinkConfig(tablePath.getDatabaseName(), tablePath.getObjectName())); - - return CatalogTable.of(flinkSchema, starRocksTable.getComment(), Lists.newArrayList(), properties); } catch (Exception e) { - throw new CatalogException( - String.format("Failed to get table %s in catalog %s", tablePath.getFullName(), getName()), e); - } - } - - @Override - public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) - throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { - if (!databaseExists(tablePath.getDatabaseName())) { - throw new DatabaseNotExistException(getName(), tablePath.getDatabaseName()); + throw new StarRocksCatalogException( + String.format("Failed to get table %s.%s", databaseName, tableName), e); } - if (tableExists(tablePath)) { - if (ignoreIfExists) { - return; - } - throw new TableAlreadyExistException(getName(), tablePath); + StarRocksTable starRocksTable = null; + if (!columns.isEmpty()) { + starRocksTable = + new StarRocksTable.Builder() + .setDatabaseName(databaseName) + .setTableName(tableName) + .setTableType(tableType) + .setColumns(columns) + .setTableKeys(tableKeys) + .build(); } - - StarRocksTable starRocksTable = StarRocksUtils.toStarRocksTable(getName(), tablePath, tableBaseConfig, table); - String createTableSql = StarRocksUtils.buildCreateTableSql(starRocksTable, ignoreIfExists); + return Optional.ofNullable(starRocksTable); + } + + /** + * Creates a table. + * + * @param table the table definition + * @param ignoreIfExists flag to specify behavior when a table already exists. if set to false, + * it throws a TableAlreadyExistException, if set to true, do nothing. + * @throws StarRocksCatalogException in case of any runtime exception + */ + public void createTable(StarRocksTable table, boolean ignoreIfExists) + throws StarRocksCatalogException { + String createTableSql = buildCreateTableSql(table, ignoreIfExists); try { executeUpdateStatement(createTableSql); - LOG.info("Success to create table {} in catalog {}", tablePath.getFullName(), getName()); - LOG.info("The create table DDL:\n{}", createTableSql); + LOG.info("Success to create table {}.{}, sql: {}", + table.getDatabaseName(), table.getDatabaseName(), createTableSql); } catch (Exception e) { - LOG.error("Failed to create table {} in catalog {}", tablePath.getFullName(), getName(), e); - LOG.error("The failed create table DDL:\n{}", createTableSql); - throw new CatalogException( - String.format("Failed to create table %s in catalog %s", - tablePath.getFullName(), getName()), e); + LOG.error("Failed to create table {}.{}, sql: {}", + table.getDatabaseName(), table.getDatabaseName(), createTableSql, e); + throw new StarRocksCatalogException( + String.format( + "Failed to create table %s.%s", + table.getDatabaseName(), table.getDatabaseName()), + e); } } - @Override - public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) - throws TableNotExistException, CatalogException { - if (!tableExists(tablePath)) { - if (ignoreIfNotExists) { - return; - } - throw new TableNotExistException(getName(), tablePath); - } + /** + * Add columns to a table. Note that those columns will be added to the last position. + * + * @param databaseName Name of the database + * @param tableName Name of the table + * @param addColumns Columns to add + * @param timeoutSecond Timeout for a schema change on StarRocks side + * @throws StarRocksCatalogException in case of any runtime exception + */ + public void alterAddColumns( + String databaseName, + String tableName, + List addColumns, + long timeoutSecond) + throws StarRocksCatalogException { + Preconditions.checkArgument( + !StringUtils.isNullOrWhitespaceOnly(databaseName), + "database name cannot be null or empty."); + Preconditions.checkArgument( + !StringUtils.isNullOrWhitespaceOnly(tableName), + "table name cannot be null or empty."); + Preconditions.checkArgument(!addColumns.isEmpty(), "Added columns should not be empty."); + String alterSql = + buildAlterAddColumnsSql(databaseName, tableName, addColumns, timeoutSecond); try { - String dropSql = String.format( - "DROP TABLE `%s`.`%s`;", tablePath.getDatabaseName(), tablePath.getObjectName()); - executeUpdateStatement(dropSql); + executeAlter(databaseName, tableName, alterSql, timeoutSecond); + LOG.info("Success to add columns to {}.{}, sql: {}", databaseName, tableName, alterSql); } catch (Exception e) { - throw new CatalogException(String.format("Failed to drop table %s in catalog %s", - tablePath.getFullName(), getName()), e); + LOG.error("Failed to add columns to {}.{}, sql: {}", databaseName, tableName, alterSql, e); + throw new StarRocksCatalogException( + String.format("Failed to add columns to %s.%s ", databaseName, tableName), e); } } - @Override - public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) - throws TableNotExistException, CatalogException { - // TODO Flink supports to add/drop column since 1.17. Implement it in the future if needed. - throw new UnsupportedOperationException(); - } + /** + * Drop columns of a table. + * + * @param databaseName Name of the database + * @param tableName Name of the table + * @param dropColumns Columns to drop + * @param timeoutSecond Timeout for a schema change on StarRocks side + * @throws StarRocksCatalogException in case of any runtime exception + */ + public void alterDropColumns( + String databaseName, String tableName, List dropColumns, long timeoutSecond) + throws StarRocksCatalogException { + Preconditions.checkArgument( + !StringUtils.isNullOrWhitespaceOnly(databaseName), + "database name cannot be null or empty."); + Preconditions.checkArgument( + !StringUtils.isNullOrWhitespaceOnly(tableName), + "table name cannot be null or empty."); + Preconditions.checkArgument(!dropColumns.isEmpty(), "Drop columns should not be empty."); - @Override - public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) - throws TableNotExistException, TableAlreadyExistException, CatalogException { - throw new UnsupportedOperationException(); + String alterSql = + buildAlterDropColumnsSql(databaseName, tableName, dropColumns, timeoutSecond); + try { + executeAlter(databaseName, tableName, alterSql, timeoutSecond); + LOG.info("Success to drop columns from {}.{}, sql: {}", databaseName, tableName, alterSql); + } catch (Exception e) { + LOG.error("Failed to drop columns from {}.{}, sql: {}", databaseName, tableName, alterSql); + throw new StarRocksCatalogException( + String.format("Failed to drop columns from %s.%s ", databaseName, tableName), e); + } } - private Map getSourceConfig(String database, String table) { - Map sourceConfig = new HashMap<>(sourceBaseConfig.toMap()); - setIfNotExist(sourceConfig, StarRocksSourceOptions.JDBC_URL, jdbcUrl); - setIfNotExist(sourceConfig, StarRocksSourceOptions.SCAN_URL, httpUrl); - setIfNotExist(sourceConfig, StarRocksSourceOptions.USERNAME, username); - setIfNotExist(sourceConfig, StarRocksSourceOptions.PASSWORD, password); - sourceConfig.put(StarRocksSourceOptions.DATABASE_NAME.key(), database); - sourceConfig.put(StarRocksSourceOptions.TABLE_NAME.key(), table); - return sourceConfig; - } + private void executeAlter( + String databaseName, String tableName, String alterSql, long timeoutSecond) + throws StarRocksCatalogException { + try { + executeUpdateStatement(alterSql); + } catch (SQLException e) { + throw new StarRocksCatalogException( + String.format("Failed to execute alter sql for %s.%s", databaseName, tableName), + e); + } + + // Alter job may be executed asynchronously, so check the job state periodically before + // timeout + long startTime = System.currentTimeMillis(); + int retries = 0; + while (System.currentTimeMillis() - startTime < timeoutSecond * 1000) { + try { + AlterJobState jobState = getAlterJobState(databaseName, tableName); + retries = 0; + LOG.info("Get alter job state for {}.{}, {}", databaseName, tableName, jobState); + if ("FINISHED".equalsIgnoreCase(jobState.state)) { + return; + } else if ("CANCELLED".equalsIgnoreCase(jobState.state)) { + throw new StarRocksCatalogException( + "Alter job is cancelled, job state is " + jobState); + } + } catch (Exception e) { + LOG.warn("Failed to get alter job state for {}.{} with retries {}", databaseName, tableName, retries, e); + retries += 1; + if (retries > 3) { + throw new StarRocksCatalogException( + String.format( + "Failed to get alter job state for %s.%s", + databaseName, tableName), + e); + } + } - private Map getSinkConfig(String database, String table) { - Map sinkConfig = new HashMap<>(sinkBaseConfig.toMap()); - setIfNotExist(sinkConfig, StarRocksSinkOptions.JDBC_URL, jdbcUrl); - setIfNotExist(sinkConfig, StarRocksSinkOptions.LOAD_URL, httpUrl); - setIfNotExist(sinkConfig, StarRocksSinkOptions.USERNAME, username); - setIfNotExist(sinkConfig, StarRocksSinkOptions.PASSWORD, password); - sinkConfig.put(StarRocksSinkOptions.DATABASE_NAME.key(), database); - sinkConfig.put(StarRocksSinkOptions.TABLE_NAME.key(), table); - if (sinkConfig.containsKey(StarRocksSinkOptions.SINK_LABEL_PREFIX.key())) { - String rawLabelPrefix = sinkConfig.get(StarRocksSinkOptions.SINK_LABEL_PREFIX.key()); - String labelPrefix = String.join("_", rawLabelPrefix, database, table); - sinkConfig.put(StarRocksSinkOptions.SINK_LABEL_PREFIX.key(), labelPrefix); + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + throw new StarRocksCatalogException( + String.format( + "Failed to get alter job state for %s.%s because of interruption ", + databaseName, tableName)); + } } - return sinkConfig; } - private void setIfNotExist(Map config, ConfigOption option, String value) { - if (!config.containsKey(option.key())) { - config.put(option.key(), value); + private static class AlterJobState { + String jobId; + String tableName; + String createTime; + String finishTime; + String transactionId; + String state; + String msg; + + @Override + public String toString() { + return "AlterJobState{" + + "jobId='" + jobId + '\'' + + ", tableName='" + tableName + '\'' + + ", createTime='" + createTime + '\'' + + ", finishTime='" + finishTime + '\'' + + ", transactionId='" + transactionId + '\'' + + ", state='" + state + '\'' + + ", msg='" + msg + '\'' + + '}'; + } + + public AlterJobState( + String jobId, + String tableName, + String createTime, + String finishTime, + String transactionId, + String state, + String msg) { + this.jobId = jobId; + this.tableName = tableName; + this.createTime = createTime; + this.finishTime = finishTime; + this.transactionId = transactionId; + this.state = state; + this.msg = msg; } } - private int executeUpdateStatement(String sql) throws SQLException { - try (Connection connection = getConnection(); - Statement statement = connection.createStatement()) { - return statement.executeUpdate(sql); + private AlterJobState getAlterJobState(String databaseName, String tableName) + throws SQLException { + String showAlterSql = String.format( + "SHOW ALTER TABLE COLUMN FROM `%s` WHERE TableName = '%s' ORDER BY JobId DESC LIMIT 1;", + databaseName, tableName); + try (Connection connection = getConnection()) { + try (PreparedStatement statement = connection.prepareStatement(showAlterSql)) { + try (ResultSet resultSet = statement.executeQuery()) { + if (resultSet.next()) { + return new AlterJobState( + resultSet.getString("JobId"), + resultSet.getString("TableName"), + resultSet.getString("CreateTime"), + resultSet.getString("FinishTime"), + resultSet.getString("TransactionId"), + resultSet.getString("State"), + resultSet.getString("Msg")); + } + } + } } + throw new SQLException( + String.format("Alter job state for %s.%s does not exsit", databaseName, tableName)); } - private List executeSingleColumnStatement(String sql, Object... params) { + private List executeSingleColumnStatement(String sql) throws SQLException { try (Connection conn = getConnection(); PreparedStatement statement = conn.prepareStatement(sql)) { List columnValues = Lists.newArrayList(); - if (params != null) { - for (int i = 0; i < params.length; i++) { - statement.setObject(i + 1, params[i]); - } - } try (ResultSet rs = statement.executeQuery()) { while (rs.next()) { String columnValue = rs.getString(1); @@ -397,222 +414,154 @@ private List executeSingleColumnStatement(String sql, Object... params) } } return columnValues; - } catch (Exception e) { - throw new CatalogException( - String.format("Failed to execute sql: %s", sql), e); } } - private Connection getConnection() throws SQLException { - // TODO reuse the connection - return DriverManager.getConnection(jdbcUrl, username, password); - } - - // ------ views ------ - - @Override - public List listViews(String databaseName) throws DatabaseNotExistException, CatalogException { - throw new UnsupportedOperationException(); - } - - // ------ partitions ------ - - @Override - public List listPartitions(ObjectPath tablePath) - throws TableNotExistException, TableNotPartitionedException, CatalogException { - return Collections.emptyList(); - } - - @Override - public List listPartitions( - ObjectPath tablePath, CatalogPartitionSpec partitionSpec) - throws TableNotExistException, TableNotPartitionedException, - PartitionSpecInvalidException, CatalogException { - return Collections.emptyList(); - } - - @Override - public List listPartitionsByFilter( - ObjectPath tablePath, List filters) - throws TableNotExistException, TableNotPartitionedException, CatalogException { - return Collections.emptyList(); - } - - @Override - public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) - throws PartitionNotExistException, CatalogException { - throw new PartitionNotExistException(getName(), tablePath, partitionSpec); - } - - @Override - public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) - throws CatalogException { - return false; - } - - @Override - public void createPartition( - ObjectPath tablePath, - CatalogPartitionSpec partitionSpec, - CatalogPartition partition, - boolean ignoreIfExists) - throws TableNotExistException, TableNotPartitionedException, - PartitionSpecInvalidException, PartitionAlreadyExistsException, - CatalogException { - throw new UnsupportedOperationException(); - } - - @Override - public void dropPartition( - ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists) - throws PartitionNotExistException, CatalogException { - throw new UnsupportedOperationException(); - } - - @Override - public void alterPartition( - ObjectPath tablePath, - CatalogPartitionSpec partitionSpec, - CatalogPartition newPartition, - boolean ignoreIfNotExists) - throws PartitionNotExistException, CatalogException { - throw new UnsupportedOperationException(); - } - - // ------ functions ------ - - @Override - public List listFunctions(String dbName) throws DatabaseNotExistException, CatalogException { - return Collections.emptyList(); - } - - @Override - public CatalogFunction getFunction(ObjectPath functionPath) - throws FunctionNotExistException, CatalogException { - throw new FunctionNotExistException(getName(), functionPath); - } - - - @Override - public boolean functionExists(ObjectPath functionPath) throws CatalogException { - return false; - } - - @Override - public void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists) - throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException { - throw new UnsupportedOperationException(); - } - - @Override - public void alterFunction( - ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists) - throws FunctionNotExistException, CatalogException { - throw new UnsupportedOperationException(); - } - - @Override - public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) - throws FunctionNotExistException, CatalogException { - throw new UnsupportedOperationException(); - } - - // ------ statistics ------ - - @Override - public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) - throws TableNotExistException, CatalogException { - return CatalogTableStatistics.UNKNOWN; - } - - @Override - public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) - throws TableNotExistException, CatalogException { - return CatalogColumnStatistics.UNKNOWN; - } - - @Override - public CatalogTableStatistics getPartitionStatistics( - ObjectPath tablePath, CatalogPartitionSpec partitionSpec) - throws PartitionNotExistException, CatalogException { - return CatalogTableStatistics.UNKNOWN; - } - - @Override - public CatalogColumnStatistics getPartitionColumnStatistics( - ObjectPath tablePath, CatalogPartitionSpec partitionSpec) - throws PartitionNotExistException, CatalogException { - return CatalogColumnStatistics.UNKNOWN; - } - - @Override - public void alterTableStatistics( - ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) - throws TableNotExistException, CatalogException { - throw new UnsupportedOperationException(); - } - - @Override - public void alterTableColumnStatistics( - ObjectPath tablePath, - CatalogColumnStatistics columnStatistics, - boolean ignoreIfNotExists) - throws TableNotExistException, CatalogException, TablePartitionedException { - throw new UnsupportedOperationException(); - } - - @Override - public void alterPartitionStatistics( - ObjectPath tablePath, - CatalogPartitionSpec partitionSpec, - CatalogTableStatistics partitionStatistics, - boolean ignoreIfNotExists) - throws PartitionNotExistException, CatalogException { - throw new UnsupportedOperationException(); - } - - @Override - public void alterPartitionColumnStatistics( - ObjectPath tablePath, - CatalogPartitionSpec partitionSpec, - CatalogColumnStatistics columnStatistics, - boolean ignoreIfNotExists) - throws PartitionNotExistException, CatalogException { - throw new UnsupportedOperationException(); + private void executeUpdateStatement(String sql) throws SQLException { + try (Connection connection = getConnection(); + Statement statement = connection.createStatement()) { + statement.executeUpdate(sql); + } } - @VisibleForTesting - String getJdbcUrl() { - return jdbcUrl; + private Connection getConnection() throws SQLException { + return DriverManager.getConnection(jdbcUrl, username, password); } - @VisibleForTesting - String getHttpUrl() { - return httpUrl; - } + // ------------------------------------------------------------------------------------------ + // StarRocks DDL SQL + // ------------------------------------------------------------------------------------------ - @VisibleForTesting - String getUsername() { - return username; + private String buildCreateDatabaseSql(String databaseName, boolean ignoreIfExists) { + return String.format( + "CREATE DATABASE %s%s;", ignoreIfExists ? "IF NOT EXISTS " : "", databaseName); } - @VisibleForTesting - String getPassword() { - return password; - } + private String buildCreateTableSql(StarRocksTable table, boolean ignoreIfExists) { + StringBuilder builder = new StringBuilder(); + builder.append( + String.format( + "CREATE TABLE %s`%s`.`%s`", + ignoreIfExists ? "IF NOT EXISTS " : "", + table.getDatabaseName(), + table.getTableName())); + builder.append(" (\n"); + String columnsStmt = + table.getColumns().stream() + .map(this::buildColumnStmt) + .collect(Collectors.joining(",\n")); + builder.append(columnsStmt); + builder.append("\n) "); - @VisibleForTesting - Configuration getSourceBaseConfig() { - return sourceBaseConfig; - } + Preconditions.checkArgument( + table.getTableType() == StarRocksTable.TableType.PRIMARY_KEY, + "Not support to build create table sql for table type " + table.getTableType()); + Preconditions.checkArgument( + table.getTableKeys().isPresent(), + "Can't build create table sql because there is no table keys"); + String tableKeys = + table.getTableKeys().get().stream() + .map(key -> "`" + key + "`") + .collect(Collectors.joining(", ")); + builder.append(String.format("PRIMARY KEY (%s)\n", tableKeys)); - @VisibleForTesting - Configuration getSinkBaseConfig() { - return sinkBaseConfig; - } + Preconditions.checkArgument( + table.getDistributionKeys().isPresent(), + "Can't build create table sql because there is no distribution keys"); + String distributionKeys = + table.getDistributionKeys().get().stream() + .map(key -> "`" + key + "`") + .collect(Collectors.joining(", ")); + builder.append(String.format("DISTRIBUTED BY HASH (%s)", distributionKeys)); + if (table.getNumBuckets().isPresent()) { + builder.append(" BUCKETS "); + builder.append(table.getNumBuckets().get()); + } + if (!table.getProperties().isEmpty()) { + builder.append("\nPROPERTIES (\n"); + String properties = + table.getProperties().entrySet().stream() + .map( + entry -> + String.format( + "\"%s\" = \"%s\"", + entry.getKey(), entry.getValue())) + .collect(Collectors.joining(",\n")); + builder.append(properties); + builder.append("\n)"); + } + builder.append(";"); + return builder.toString(); + } + + private String buildAlterAddColumnsSql( + String databaseName, + String tableName, + List addColumns, + long timeoutSecond) { + StringBuilder builder = new StringBuilder(); + builder.append(String.format("ALTER TABLE `%s`.`%s` ", databaseName, tableName)); + String columnsStmt = + addColumns.stream() + .map(col -> "ADD COLUMN " + buildColumnStmt(col)) + .collect(Collectors.joining(", ")); + builder.append(columnsStmt); + builder.append(String.format(" PROPERTIES (\"timeout\" = \"%s\")", timeoutSecond)); + builder.append(";"); + return builder.toString(); + } + + private String buildAlterDropColumnsSql( + String databaseName, String tableName, List dropColumns, long timeoutSecond) { + StringBuilder builder = new StringBuilder(); + builder.append(String.format("ALTER TABLE `%s`.`%s` ", databaseName, tableName)); + String columnsStmt = + dropColumns.stream() + .map(col -> String.format("DROP COLUMN `%s`", col)) + .collect(Collectors.joining(", ")); + builder.append(columnsStmt); + builder.append(String.format(" PROPERTIES (\"timeout\" = \"%s\")", timeoutSecond)); + builder.append(";"); + return builder.toString(); + } + + private String buildColumnStmt(StarRocksColumn column) { + StringBuilder builder = new StringBuilder(); + builder.append("`"); + builder.append(column.getColumnName()); + builder.append("` "); + builder.append( + getFullColumnType( + column.getDataType(), column.getColumnSize(), column.getDecimalDigits())); + builder.append(" "); + builder.append(column.isNullable() ? "NULL" : "NOT NULL"); + if (column.getDefaultValue().isPresent()) { + builder.append(String.format(" DEFAULT \"%s\"", column.getDefaultValue().get())); + } - @VisibleForTesting - Configuration getTableBaseConfig() { - return tableBaseConfig; + if (column.getColumnComment().isPresent()) { + builder.append(String.format(" COMMENT \"%s\"", column.getColumnComment().get())); + } + return builder.toString(); + } + + private String getFullColumnType( + String type, Optional columnSize, Optional decimalDigits) { + String dataType = type.toUpperCase(); + switch (dataType) { + case "DECIMAL": + Preconditions.checkArgument( + columnSize.isPresent(), "DECIMAL type must have column size"); + Preconditions.checkArgument( + decimalDigits.isPresent(), "DECIMAL type must have decimal digits"); + return String.format("DECIMAL(%d, %s)", columnSize.get(), decimalDigits.get()); + case "CHAR": + case "VARCHAR": + Preconditions.checkArgument( + columnSize.isPresent(), type + " type must have column size"); + return String.format("%s(%d)", dataType, columnSize.get()); + default: + return dataType; + } } } diff --git a/src/main/java/com/starrocks/connector/flink/catalog/StarRocksCatalogException.java b/src/main/java/com/starrocks/connector/flink/catalog/StarRocksCatalogException.java new file mode 100644 index 00000000..c72980ee --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/catalog/StarRocksCatalogException.java @@ -0,0 +1,43 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.catalog; + +/** A catalog-related, runtime exception. */ +public class StarRocksCatalogException extends RuntimeException { + + /** @param message the detail message. */ + public StarRocksCatalogException(String message) { + super(message); + } + + /** @param cause the cause. */ + public StarRocksCatalogException(Throwable cause) { + super(cause); + } + + /** + * @param message the detail message. + * @param cause the cause. + */ + public StarRocksCatalogException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/src/main/java/com/starrocks/connector/flink/catalog/StarRocksCatalogFactory.java b/src/main/java/com/starrocks/connector/flink/catalog/StarRocksCatalogFactory.java index cba43d28..a66e5a37 100644 --- a/src/main/java/com/starrocks/connector/flink/catalog/StarRocksCatalogFactory.java +++ b/src/main/java/com/starrocks/connector/flink/catalog/StarRocksCatalogFactory.java @@ -59,7 +59,7 @@ public Catalog createCatalog(Context context) { Configuration tableBaseConfig = Configuration.fromMap( getPrefixConfigs("table.", context.getOptions(), false)); - return new StarRocksCatalog( + return new FlinkCatalog( context.getName(), helper.getOptions().get(CatalogOptions.FE_JDBC_URL), helper.getOptions().get(CatalogOptions.FE_HTTP_URL), diff --git a/src/main/java/com/starrocks/connector/flink/catalog/StarRocksColumn.java b/src/main/java/com/starrocks/connector/flink/catalog/StarRocksColumn.java index bebb710b..dc4189c1 100644 --- a/src/main/java/com/starrocks/connector/flink/catalog/StarRocksColumn.java +++ b/src/main/java/com/starrocks/connector/flink/catalog/StarRocksColumn.java @@ -20,145 +20,199 @@ package com.starrocks.connector.flink.catalog; -import java.io.Serializable; import javax.annotation.Nullable; -/** Describe a column of StarRocks table. */ +import java.io.Serializable; +import java.util.Objects; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Describe a column of StarRocks table. These metas are from information_schema.COLUMNS. */ public class StarRocksColumn implements Serializable { private static final long serialVersionUID = 1L; - private final String name; - // Index of column in table (starting at 0) + /** The name of the column. COLUMN_NAME in information_schema.COLUMNS. */ + private final String columnName; + + /** + * The position of the column within the table (starting at 0). ORDINAL_POSITION in + * information_schema.COLUMNS. + */ private final int ordinalPosition; - private final String type; - @Nullable - private final String key; - @Nullable - private final Integer size; - @Nullable - private final Integer scale; - @Nullable - private final String defaultValue; + + /** The column data type. DATA_TYPE in information_schema.COLUMNS. */ + private final String dataType; + + /** The column nullability.IS_NULLABLE in information_schema.COLUMNS. */ private final boolean isNullable; - @Nullable - private final String comment; - private StarRocksColumn(String name, int ordinalPosition, String type, @Nullable String key, - @Nullable Integer size, @Nullable Integer scale, @Nullable String defaultValue, - boolean isNullable, @Nullable String comment) { - this.name = name; + /** The default value for the column. COLUMN_DEFAULT in information_schema.COLUMNS. */ + @Nullable private final String defaultValue; + + /** + * The column size. COLUMN_SIZE in information_schema.COLUMNS. For numeric data, this is the + * maximum precision. For character data, this is the length in characters. For other data + * types, this is null. + */ + @Nullable private final Integer columnSize; + + /** + * The number of fractional digits for numeric data. This is null for other data types. + * DECIMAL_DIGITS in information_schema.COLUMNS. + */ + @Nullable private final Integer decimalDigits; + + /** The column comment. COLUMN_COMMENT in information_schema.COLUMNS. */ + @Nullable private final String columnComment; + + + private StarRocksColumn( + String columnName, + int ordinalPosition, + String dataType, + boolean isNullable, + @Nullable String defaultValue, + @Nullable Integer columnSize, + @Nullable Integer decimalDigits, + @Nullable String columnComment) { + this.columnName = checkNotNull(columnName); this.ordinalPosition = ordinalPosition; - this.type = type; - this.key = key; - this.size = size; - this.scale = scale; - this.defaultValue = defaultValue; + this.dataType = checkNotNull(dataType); this.isNullable = isNullable; - this.comment = comment; + this.defaultValue = defaultValue; + this.columnSize = columnSize; + this.decimalDigits = decimalDigits; + this.columnComment = columnComment; } - public String getName() { - return name; + public String getColumnName() { + return columnName; } public int getOrdinalPosition() { return ordinalPosition; } - public String getType() { - return type; + public String getDataType() { + return dataType; + } + + public boolean isNullable() { + return isNullable; } - @Nullable - public String getKey() { - return key; + public Optional getDefaultValue() { + return Optional.ofNullable(defaultValue); } - @Nullable - public Integer getSize() { - return size; + public Optional getColumnSize() { + return Optional.ofNullable(columnSize); } - @Nullable - public Integer getScale() { - return scale; + public Optional getDecimalDigits() { + return Optional.ofNullable(decimalDigits); } - @Nullable - public String getDefaultValue() { - return defaultValue; + public Optional getColumnComment() { + return Optional.ofNullable(columnComment); } - public boolean isNullable() { - return isNullable; + @Override + public String toString() { + return "StarRocksColumn{" + + "columnName='" + columnName + '\'' + + ", ordinalPosition=" + ordinalPosition + + ", dataType='" + dataType + '\'' + + ", isNullable=" + isNullable + + ", defaultValue='" + defaultValue + '\'' + + ", columnSize=" + columnSize + + ", decimalDigits=" + decimalDigits + + ", columnComment='" + columnComment + '\'' + + '}'; } - @Nullable - public String getComment() { - return comment; + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + StarRocksColumn column = (StarRocksColumn) o; + return ordinalPosition == column.ordinalPosition && isNullable == column.isNullable && + Objects.equals(columnName, column.columnName) && + dataType.equalsIgnoreCase(column.dataType) && + Objects.equals(defaultValue, column.defaultValue) && + Objects.equals(columnSize, column.columnSize) && + Objects.equals(decimalDigits, column.decimalDigits) && + Objects.equals(columnComment, column.columnComment); } + /** Build a {@link StarRocksColumn}. */ public static class Builder { - private String name; + private String columnName; private int ordinalPosition; - private String type; - private String key; - private Integer size; - private Integer scale; + private String dataType; + private boolean isNullable = true; private String defaultValue; - boolean isNullable; - private String comment; + private Integer columnSize; + private Integer decimalDigits; + private String columnComment; - public Builder setName(String name) { - this.name = name; + public StarRocksColumn.Builder setColumnName(String columnName) { + this.columnName = columnName; return this; } - public Builder setOrdinalPosition(int ordinalPosition) { + public StarRocksColumn.Builder setOrdinalPosition(int ordinalPosition) { this.ordinalPosition = ordinalPosition; return this; } - public Builder setType(String type) { - this.type = type; + public StarRocksColumn.Builder setDataType(String dataType) { + this.dataType = dataType; return this; } - public Builder setKey(String key) { - this.key = key; + public StarRocksColumn.Builder setNullable(boolean isNullable) { + this.isNullable = isNullable; return this; } - public Builder setSize(Integer size) { - this.size = size; - return this; - } - - public Builder setScale(Integer scale) { - this.scale = scale; + public StarRocksColumn.Builder setDefaultValue(String defaultValue) { + this.defaultValue = defaultValue; return this; } - public Builder setDefaultValue(String defaultValue) { - this.defaultValue = defaultValue; + public StarRocksColumn.Builder setColumnSize(Integer columnSize) { + this.columnSize = columnSize; return this; } - public Builder setNullable(boolean nullable) { - isNullable = nullable; + public StarRocksColumn.Builder setDecimalDigits(Integer decimalDigits) { + this.decimalDigits = decimalDigits; return this; } - public Builder setComment(String comment) { - this.comment = comment; + public StarRocksColumn.Builder setColumnComment(String columnComment) { + this.columnComment = columnComment; return this; } public StarRocksColumn build() { - return new StarRocksColumn(name, ordinalPosition, type, key, size, scale, - defaultValue, isNullable, comment); + return new StarRocksColumn( + columnName, + ordinalPosition, + dataType, + isNullable, + defaultValue, + columnSize, + decimalDigits, + columnComment); } } } \ No newline at end of file diff --git a/src/main/java/com/starrocks/connector/flink/catalog/StarRocksSchema.java b/src/main/java/com/starrocks/connector/flink/catalog/StarRocksSchema.java deleted file mode 100644 index cc94821c..00000000 --- a/src/main/java/com/starrocks/connector/flink/catalog/StarRocksSchema.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright 2021-present StarRocks, Inc. All rights reserved. - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.starrocks.connector.flink.catalog; - -import org.apache.flink.util.Preconditions; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** Describe the schema of a StarRocks table. */ -public class StarRocksSchema implements Serializable { - - private static final long serialVersionUID = 1L; - - // Have been sorted by the ordinal positions of columns. - private final List columns; - // Map column names to the Column structure. - private final Map columnMap; - - private StarRocksSchema(List columns) { - this.columns = new ArrayList<>(columns); - this.columns.sort(Comparator.comparingInt(StarRocksColumn::getOrdinalPosition)); - this.columnMap = new HashMap<>(); - this.columns.forEach(column -> columnMap.put(column.getName(), column)); - } - - public StarRocksColumn getColumn(String columnName) { - return columnMap.get(columnName); - } - - public List getColumns() { - return columns; - } - - public static class Builder { - - private final List columns = new ArrayList<>(); - - public Builder addColumn(StarRocksColumn column) { - columns.add(column); - return this; - } - - public StarRocksSchema build() { - Preconditions.checkState(!columns.isEmpty(), "There should be at least one column."); - return new StarRocksSchema(columns); - } - } -} diff --git a/src/main/java/com/starrocks/connector/flink/catalog/StarRocksTable.java b/src/main/java/com/starrocks/connector/flink/catalog/StarRocksTable.java index 83c523de..aa81a1d5 100644 --- a/src/main/java/com/starrocks/connector/flink/catalog/StarRocksTable.java +++ b/src/main/java/com/starrocks/connector/flink/catalog/StarRocksTable.java @@ -22,152 +22,200 @@ import org.apache.flink.util.Preconditions; -import java.io.Serializable; +import javax.annotation.Nullable; + import java.util.ArrayList; -import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; -import javax.annotation.Nullable; +import java.util.Optional; -/** Describe a StarRocks table. */ -public class StarRocksTable implements Serializable { - enum TableType { +/** + * Describe a StarRocks table. See StarRocks table + * design for how to define a StarRocks table. + */ +public class StarRocksTable { + + /** + * Types of StarRocks table. See StarRocks Table Types. + */ + public enum TableType { UNKNOWN, - DUPLICATE, + DUPLICATE_KEY, AGGREGATE, - UNIQUE, - PRIMARY + UNIQUE_KEY, + PRIMARY_KEY } - private static final long serialVersionUID = 1L; - - private final String database; - - private final String table; + /** The database name. */ + private final String databaseName; - private final StarRocksSchema schema; + /** The table name. */ + private final String tableName; + /** The type of StarRocks type. */ private final TableType tableType; - // Following fields can be nullable if we can not get the metas from StarRocks + /** The columns sorted by the ordinal position. */ + private final List columns; - // For different tables, have different meaning. It's duplicate keys for duplicate table, - // aggregate keys for aggregate table, unique keys for unique table, and primary keys for - // primary table. The list has been sorted by the ordinal positions of the columns. - @Nullable - private final List tableKeys; + /** + * The table keys sorted by the ordinal position. null if it's unknown. The table keys has + * different meaning for different types of tables. For duplicate key table, It's duplicate + * keys. For aggregate table, it's aggregate keys. For unique key table, it's unique keys. For + * primary key table, it's primary keys. + */ + @Nullable private final List tableKeys; - @Nullable - private final List partitionKeys; + /** The distribution keys. null if it's unknown. */ + @Nullable private final List distributionKeys; - @Nullable - private final Integer numBuckets; + /** The number of buckets. null if it's unknown or automatic. */ + @Nullable private final Integer numBuckets; - @Nullable - public final String comment; + /** The table comment. null if there is no comment or it's unknown. */ + @Nullable private final String comment; - @Nullable + /** The properties of the table. */ private final Map properties; - private StarRocksTable(String database, String table, StarRocksSchema schema, TableType tableType, - @Nullable List tableKeys, @Nullable List partitionKeys, - @Nullable Integer numBuckets, @Nullable String comment, - @Nullable Map properties) { - this.database = Preconditions.checkNotNull(database); - this.table = Preconditions.checkNotNull(table); - this.schema = Preconditions.checkNotNull(schema); - this.tableType = Preconditions.checkNotNull(tableType); + /** Map the column name to the column. May be lazily initialized. */ + @Nullable private volatile Map columnMap; + + private StarRocksTable( + String databaseName, + String tableName, + TableType tableType, + List columns, + @Nullable List tableKeys, + @Nullable List distributionKeys, + @Nullable Integer numBuckets, + @Nullable String comment, + Map properties) { + Preconditions.checkNotNull(databaseName); + Preconditions.checkNotNull(tableName); + Preconditions.checkNotNull(tableType); + Preconditions.checkArgument(columns != null && !columns.isEmpty()); + this.databaseName = databaseName; + this.tableName = tableName; + this.tableType = tableType; + this.columns = columns; this.tableKeys = tableKeys; - this.partitionKeys = partitionKeys; + this.distributionKeys = distributionKeys; this.numBuckets = numBuckets; this.comment = comment; - this.properties = properties; + this.properties = Preconditions.checkNotNull(properties); } - public String getDatabase() { - return database; + public String getDatabaseName() { + return databaseName; } - public String getTable() { - return table; + public String getTableName() { + return tableName; } - public StarRocksSchema getSchema() { - return schema; - } - - @Nullable public TableType getTableType() { return tableType; } - @Nullable - public List getTableKeys() { - return tableKeys; + public List getColumns() { + return columns; + } + + public Optional> getTableKeys() { + return Optional.ofNullable(tableKeys); } - @Nullable - public List getPartitionKeys() { - return partitionKeys; + public Optional> getDistributionKeys() { + return Optional.ofNullable(distributionKeys); } - @Nullable - public Integer getNumBuckets() { - return numBuckets; + public Optional getNumBuckets() { + return Optional.ofNullable(numBuckets); } - @Nullable - public String getComment() { - return comment; + public Optional getComment() { + return Optional.ofNullable(comment); } - @Nullable public Map getProperties() { return properties; } + public StarRocksColumn getColumn(String columnName) { + if (columnMap == null) { + synchronized (this) { + if (columnMap == null) { + columnMap = new HashMap<>(); + for (StarRocksColumn column : columns) { + columnMap.put(column.getColumnName(), column); + } + } + } + } + return columnMap.get(columnName); + } + + @Override + public String toString() { + return "StarRocksTable{" + + "databaseName='" + databaseName + '\'' + + ", tableName='" + tableName + '\'' + + ", tableType=" + tableType + + ", columns=" + columns + + ", tableKeys=" + tableKeys + + ", distributionKeys=" + distributionKeys + + ", numBuckets=" + numBuckets + + ", comment='" + comment + '\'' + + ", properties=" + properties + + ", columnMap=" + columnMap + + '}'; + } + + /** Build a {@link StarRocksTable}. */ public static class Builder { - private String database; - private String table; - private StarRocksSchema schema; + private String databaseName; + private String tableName; private TableType tableType; + private List columns = new ArrayList<>(); private List tableKeys; - private List partitionKeys; + private List distributionKeys; private Integer numBuckets; private String comment; - private Map properties; + private Map properties = new HashMap<>(); - public Builder setDatabase(String database) { - this.database = database; + public Builder setDatabaseName(String databaseName) { + this.databaseName = databaseName; return this; } - public Builder setTable(String table) { - this.table = table; + public Builder setTableName(String tableName) { + this.tableName = tableName; return this; } - public Builder setSchema(StarRocksSchema schema) { - this.schema = schema; + public Builder setTableType(TableType tableType) { + this.tableType = tableType; return this; } - public Builder setTableType(TableType tableType) { - this.tableType = tableType; + public Builder setColumns(List columns) { + this.columns = columns; return this; } public Builder setTableKeys(List tableKeys) { - this.tableKeys = new ArrayList<>(tableKeys); + this.tableKeys = tableKeys; return this; } - public Builder setPartitionKeys(List partitionKeys) { - this.partitionKeys = new ArrayList<>(partitionKeys); + public Builder setDistributionKeys(List distributionKeys) { + this.distributionKeys = distributionKeys; return this; } @@ -182,30 +230,22 @@ public Builder setComment(String comment) { } public Builder setTableProperties(Map properties) { - this.properties = new HashMap<>(properties); + this.properties = properties; return this; } public StarRocksTable build() { - Preconditions.checkNotNull(database, "database can't be null"); - Preconditions.checkNotNull(table, "table can't be null"); - Preconditions.checkNotNull(schema, "schema can't be null"); - Preconditions.checkNotNull(tableType, "table type can't be null"); - - if (tableKeys != null) { - List tableKeyColumns = new ArrayList<>(tableKeys.size()); - for (String name : tableKeys) { - StarRocksColumn column = schema.getColumn(name); - Preconditions.checkNotNull(column, - String.format("%s.%s does not contain column %s", database, table, name)); - tableKeyColumns.add(column); - } - tableKeyColumns.sort(Comparator.comparingInt(StarRocksColumn::getOrdinalPosition)); - tableKeys = tableKeyColumns.stream().map(StarRocksColumn::getName).collect(Collectors.toList()); - } - - return new StarRocksTable(database, table, schema, tableType, tableKeys, partitionKeys, - numBuckets, comment, properties); + return new StarRocksTable( + databaseName, + tableName, + tableType, + columns, + tableKeys, + distributionKeys, + numBuckets, + comment, + properties); } } } + diff --git a/src/main/java/com/starrocks/connector/flink/catalog/StarRocksUtils.java b/src/main/java/com/starrocks/connector/flink/catalog/StarRocksUtils.java index 6075f0e5..e59231e0 100644 --- a/src/main/java/com/starrocks/connector/flink/catalog/StarRocksUtils.java +++ b/src/main/java/com/starrocks/connector/flink/catalog/StarRocksUtils.java @@ -28,86 +28,14 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.Preconditions; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; +import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; +import java.util.HashMap; import java.util.List; -import java.util.stream.Collectors; -import javax.annotation.Nullable; +import java.util.Map; public class StarRocksUtils { - private static final String TABLE_SCHEMA_QUERY = - "SELECT `COLUMN_NAME`, `DATA_TYPE`, `ORDINAL_POSITION`, `COLUMN_SIZE`, `DECIMAL_DIGITS`, " + - "`COLUMN_DEFAULT`, `IS_NULLABLE`, `COLUMN_KEY` FROM `information_schema`.`COLUMNS` " + - "WHERE `TABLE_SCHEMA`=? AND `TABLE_NAME`=?;"; - - public static StarRocksSchema getStarRocksSchema(Connection connection, String database, String table) throws Exception { - StarRocksSchema.Builder schemaBuilder = new StarRocksSchema.Builder(); - try (PreparedStatement statement = connection.prepareStatement(TABLE_SCHEMA_QUERY)) { - statement.setObject(1, database); - statement.setObject(2, table); - try (ResultSet resultSet = statement.executeQuery()) { - while (resultSet.next()) { - String name = resultSet.getString("COLUMN_NAME"); - String type = resultSet.getString("DATA_TYPE"); - int position = resultSet.getInt("ORDINAL_POSITION"); - Integer size = resultSet.getInt("COLUMN_SIZE"); - if (resultSet.wasNull()) { - size = null; - } - Integer scale = resultSet.getInt("DECIMAL_DIGITS"); - if (resultSet.wasNull()) { - scale = null; - } - String defaultValue = resultSet.getString("COLUMN_DEFAULT"); - String isNullable = resultSet.getString("IS_NULLABLE"); - String columnKey = resultSet.getString("COLUMN_KEY"); - StarRocksColumn column = new StarRocksColumn.Builder() - .setName(name) - .setOrdinalPosition(position - 1) - .setType(type) - .setKey(columnKey) - .setSize(size) - .setScale(scale) - .setDefaultValue(defaultValue) - .setNullable(isNullable == null || !isNullable.equalsIgnoreCase("NO")) - .setComment(null) - .build(); - schemaBuilder.addColumn(column); - } - } - } - - return schemaBuilder.build(); - } - - public static List getPrimaryKeys(StarRocksSchema starRocksSchema) { - return starRocksSchema.getColumns().stream() - .filter(column -> "PRI".equalsIgnoreCase(column.getKey())) - // In StarRocks, primary keys must be declared in the same order of that in schema - .sorted(Comparator.comparingInt(StarRocksColumn::getOrdinalPosition)) - .map(StarRocksColumn::getName) - .collect(Collectors.toList()); - } - - public static StarRocksTable getStarRocksTable(Connection connection, String database, String table) throws Exception { - StarRocksSchema starRocksSchema = StarRocksUtils.getStarRocksSchema(connection, database, table); - // could be empty if this is not a primary key table - List primaryKeys = StarRocksUtils.getPrimaryKeys(starRocksSchema); - StarRocksTable.TableType tableType = primaryKeys.isEmpty() ? - StarRocksTable.TableType.UNKNOWN : StarRocksTable.TableType.PRIMARY; - return new StarRocksTable.Builder() - .setDatabase(database) - .setTable(table) - .setSchema(starRocksSchema) - .setTableType(tableType) - .setTableKeys(primaryKeys.isEmpty() ? null : primaryKeys) - .build(); - } - public static StarRocksTable toStarRocksTable( String catalogName, ObjectPath tablePath, @@ -120,33 +48,47 @@ public static StarRocksTable toStarRocksTable( catalogName, tablePath.getFullName())); } RowType rowType = (RowType) flinkSchema.toPhysicalRowDataType().getLogicalType(); - StarRocksSchema.Builder starRocksSchemaBuilder = new StarRocksSchema.Builder(); - for (int i = 0; i < rowType.getFieldCount(); i++) { - RowType.RowField field = rowType.getFields().get(i); + Map nameToFieldMap = new HashMap<>(); + for (RowType.RowField field : rowType.getFields()) { + nameToFieldMap.put(field.getName(), field); + } + List primaryKeys = flinkSchema.getPrimaryKey() + .map(pk -> pk.getColumns()) + .orElse(Collections.emptyList()); + Preconditions.checkState(!primaryKeys.isEmpty()); + + List orderedFields = new ArrayList<>(); + for (String primaryKey : primaryKeys) { + orderedFields.add(nameToFieldMap.get(primaryKey)); + } + for (RowType.RowField field : rowType.getFields()) { + if (!primaryKeys.contains(field.getName())) { + orderedFields.add(field); + } + } + + List starRocksColumns = new ArrayList<>(); + for (int i = 0; i < orderedFields.size(); i++) { + RowType.RowField field = orderedFields.get(i); StarRocksColumn.Builder columnBuilder = new StarRocksColumn.Builder() - .setName(field.getName()) + .setColumnName(field.getName()) .setOrdinalPosition(i); TypeUtils.toStarRocksType(columnBuilder, field.getType()); - starRocksSchemaBuilder.addColumn(columnBuilder.build()); + starRocksColumns.add(columnBuilder.build()); } - StarRocksSchema starRocksSchema = starRocksSchemaBuilder.build(); - StarRocksTable.Builder starRocksTableBuilder = new StarRocksTable.Builder() - .setDatabase(tablePath.getDatabaseName()) - .setTable(tablePath.getObjectName()) - .setSchema(starRocksSchema); - List primaryKeys = flinkSchema.getPrimaryKey() - .map(pk -> pk.getColumns()) - .orElse(Collections.emptyList()); - // have verified it's a primary key table above - Preconditions.checkState(!primaryKeys.isEmpty()); - starRocksTableBuilder.setTableType(StarRocksTable.TableType.PRIMARY); - starRocksTableBuilder.setTableKeys(primaryKeys); + StarRocksTable.Builder starRocksTableBuilder = new StarRocksTable.Builder() + .setDatabaseName(tablePath.getDatabaseName()) + .setTableName(tablePath.getObjectName()) + .setTableType(StarRocksTable.TableType.PRIMARY_KEY) + .setColumns(starRocksColumns) + .setTableKeys(primaryKeys) + .setDistributionKeys(primaryKeys) + .setComment(flinkTable.getComment()); if (tableBaseConfig.contains(CatalogOptions.TABLE_NUM_BUCKETS)) { starRocksTableBuilder.setNumBuckets(tableBaseConfig.get(CatalogOptions.TABLE_NUM_BUCKETS)); } - starRocksTableBuilder.setComment(flinkTable.getComment()); starRocksTableBuilder.setTableProperties( ConfigUtils.getPrefixConfigs( CatalogOptions.TABLE_PROPERTIES_PREFIX, @@ -156,73 +98,4 @@ public static StarRocksTable toStarRocksTable( ); return starRocksTableBuilder.build(); } - - public static String buildCreateTableSql(StarRocksTable table, boolean ignoreIfExists) { - Preconditions.checkState(table.getTableType() == StarRocksTable.TableType.PRIMARY); - StringBuilder builder = new StringBuilder(); - builder.append( - String.format("CREATE TABLE %s`%s`.`%s`", - ignoreIfExists ? "IF NOT EXISTS " : "", - table.getDatabase(), - table.getTable()) - ); - builder.append(" (\n"); - StarRocksSchema schema = table.getSchema(); - String columnsStmt = schema.getColumns().stream().map(StarRocksUtils::buildColumnStatement) - .collect(Collectors.joining(",\n")); - builder.append(columnsStmt); - builder.append("\n) "); - String primaryKeys = table.getTableKeys().stream() - .map(pk -> "`" + pk + "`").collect(Collectors.joining(", ")); - builder.append(String.format("PRIMARY KEY (%s)\n", primaryKeys)); - builder.append(String.format("DISTRIBUTED BY HASH (%s)", primaryKeys)); - if (table.getNumBuckets() != null) { - builder.append(" BUCKETS "); - builder.append(table.getNumBuckets()); - } - if (table.getProperties() != null && !table.getProperties().isEmpty()) { - builder.append("\nPROPERTIES (\n"); - String properties = table.getProperties().entrySet().stream() - .map(entry -> String.format("\"%s\" = \"%s\"", entry.getKey(), entry.getValue())) - .collect(Collectors.joining(",\n")); - builder.append(properties); - builder.append("\n)"); - } - builder.append(";"); - return builder.toString(); - } - - public static String buildColumnStatement(StarRocksColumn column) { - StringBuilder builder = new StringBuilder(); - builder.append("`"); - builder.append(column.getName()); - builder.append("` "); - builder.append(getFullColumnType(column.getType(), column.getSize(), column.getScale())); - builder.append(" "); - builder.append(column.isNullable() ? "NULL" : "NOT NULL"); - if (column.getDefaultValue() != null) { - builder.append(" DEFAULT '"); - builder.append(column.getDefaultValue()); - builder.append("'"); - } - if (column.getComment() != null) { - builder.append(" COMMENT \""); - builder.append(column.getComment()); - builder.append("\""); - } - return builder.toString(); - } - - public static String getFullColumnType(String type, @Nullable Integer size, @Nullable Integer scale) { - String dataType = type.toUpperCase(); - switch (dataType) { - case "DECIMAL": - return String.format("DECIMAL(%d, %s)", size, scale); - case "CHAR": - case "VARCHAR": - return String.format("%s(%d)", dataType, size); - default: - return dataType; - } - } } diff --git a/src/main/java/com/starrocks/connector/flink/catalog/TypeUtils.java b/src/main/java/com/starrocks/connector/flink/catalog/TypeUtils.java index a471886a..e00326ef 100644 --- a/src/main/java/com/starrocks/connector/flink/catalog/TypeUtils.java +++ b/src/main/java/com/starrocks/connector/flink/catalog/TypeUtils.java @@ -36,7 +36,6 @@ import org.apache.flink.table.types.logical.TimestampType; import org.apache.flink.table.types.logical.TinyIntType; import org.apache.flink.table.types.logical.VarCharType; -import org.apache.flink.table.types.logical.ZonedTimestampType; import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor; import org.apache.flink.util.Preconditions; @@ -135,89 +134,96 @@ public FlinkLogicalTypeVisitor(StarRocksColumn.Builder builder) { @Override public StarRocksColumn.Builder visit(CharType charType) { - builder.setType(CHAR); - builder.setSize(charType.getLength()); + builder.setDataType(CHAR); + builder.setColumnSize(charType.getLength()); + builder.setNullable(charType.isNullable()); return builder; } @Override public StarRocksColumn.Builder visit(VarCharType varCharType) { - builder.setType(VARCHAR); - builder.setSize(Math.min(varCharType.getLength(), MAX_VARCHAR_SIZE)); + builder.setDataType(VARCHAR); + builder.setColumnSize(Math.min(varCharType.getLength(), MAX_VARCHAR_SIZE)); + builder.setNullable(varCharType.isNullable()); return builder; } @Override public StarRocksColumn.Builder visit(BooleanType booleanType) { - builder.setType(BOOLEAN); + builder.setDataType(BOOLEAN); + builder.setNullable(booleanType.isNullable()); return builder; } @Override public StarRocksColumn.Builder visit(DecimalType decimalType) { - builder.setType(DECIMAL); - builder.setSize(decimalType.getPrecision()); - builder.setScale(decimalType.getScale()); + builder.setDataType(DECIMAL); + builder.setColumnSize(decimalType.getPrecision()); + builder.setDecimalDigits(decimalType.getScale()); + builder.setNullable(decimalType.isNullable()); return builder; } @Override public StarRocksColumn.Builder visit(TinyIntType tinyIntType) { - builder.setType(TINYINT); + builder.setDataType(TINYINT); + builder.setNullable(tinyIntType.isNullable()); return builder; } @Override public StarRocksColumn.Builder visit(SmallIntType smallIntType) { - builder.setType(SMALLINT); + builder.setDataType(SMALLINT); + builder.setNullable(smallIntType.isNullable()); return builder; } @Override public StarRocksColumn.Builder visit(IntType intType) { - builder.setType(INT); + builder.setDataType(INT); + builder.setNullable(intType.isNullable()); return builder; } @Override public StarRocksColumn.Builder visit(BigIntType bigIntType) { - builder.setType(BIGINT); + builder.setDataType(BIGINT); + builder.setNullable(bigIntType.isNullable()); return builder; } @Override public StarRocksColumn.Builder visit(FloatType floatType) { - builder.setType(FLOAT); + builder.setDataType(FLOAT); + builder.setNullable(floatType.isNullable()); return builder; } @Override public StarRocksColumn.Builder visit(DoubleType doubleType) { - builder.setType(DOUBLE); + builder.setDataType(DOUBLE); + builder.setNullable(doubleType.isNullable()); return builder; } @Override public StarRocksColumn.Builder visit(DateType dateType) { - builder.setType(DATE); + builder.setDataType(DATE); + builder.setNullable(dateType.isNullable()); return builder; } @Override public StarRocksColumn.Builder visit(TimestampType timestampType) { - builder.setType(DATETIME); - return builder; - } - - @Override - public StarRocksColumn.Builder visit(ZonedTimestampType zonedTimestampType) { - builder.setType(DATETIME); + builder.setDataType(DATETIME); + builder.setNullable(timestampType.isNullable()); return builder; } @Override public StarRocksColumn.Builder visit(LocalZonedTimestampType localZonedTimestampType) { - builder.setType(DATETIME); + builder.setDataType(DATETIME); + builder.setNullable(localZonedTimestampType.isNullable()); return builder; } diff --git a/src/test/java/com/starrocks/connector/flink/catalog/StarRocksCatalogFactoryTest.java b/src/test/java/com/starrocks/connector/flink/catalog/FlinkCatalogFactoryTest.java similarity index 76% rename from src/test/java/com/starrocks/connector/flink/catalog/StarRocksCatalogFactoryTest.java rename to src/test/java/com/starrocks/connector/flink/catalog/FlinkCatalogFactoryTest.java index a3a3c7e8..99698018 100644 --- a/src/test/java/com/starrocks/connector/flink/catalog/StarRocksCatalogFactoryTest.java +++ b/src/test/java/com/starrocks/connector/flink/catalog/FlinkCatalogFactoryTest.java @@ -30,7 +30,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -public class StarRocksCatalogFactoryTest { +public class FlinkCatalogFactoryTest { @Test public void testCreateCatalog() { @@ -54,26 +54,26 @@ public void testCreateCatalog() { null, Thread.currentThread().getContextClassLoader() ); - assertTrue(catalog instanceof StarRocksCatalog); - StarRocksCatalog starRocksCatalog = (StarRocksCatalog) catalog; - assertEquals("test_catalog", starRocksCatalog.getName()); - assertEquals("starrocks_db", starRocksCatalog.getDefaultDatabase()); - assertEquals("jdbc:mysql://127.0.0.1:11903", starRocksCatalog.getJdbcUrl()); - assertEquals("127.0.0.1:11901", starRocksCatalog.getHttpUrl()); - assertEquals("root", starRocksCatalog.getUsername()); - assertEquals("123456", starRocksCatalog.getPassword()); + assertTrue(catalog instanceof FlinkCatalog); + FlinkCatalog flinkCatalog = (FlinkCatalog) catalog; + assertEquals("test_catalog", flinkCatalog.getName()); + assertEquals("starrocks_db", flinkCatalog.getDefaultDatabase()); + assertEquals("jdbc:mysql://127.0.0.1:11903", flinkCatalog.getJdbcUrl()); + assertEquals("127.0.0.1:11901", flinkCatalog.getHttpUrl()); + assertEquals("root", flinkCatalog.getUsername()); + assertEquals("123456", flinkCatalog.getPassword()); - Map sourceBaseConfig = starRocksCatalog.getSourceBaseConfig().toMap(); + Map sourceBaseConfig = flinkCatalog.getSourceBaseConfig().toMap(); assertEquals(2, sourceBaseConfig.size()); assertEquals("2000", sourceBaseConfig.get("scan.connect.timeout-ms")); assertEquals("8192", sourceBaseConfig.get("scan.params.batch-rows")); - Map sinkBaseConfig = starRocksCatalog.getSinkBaseConfig().toMap(); + Map sinkBaseConfig = flinkCatalog.getSinkBaseConfig().toMap(); assertEquals(2, sinkBaseConfig.size()); assertEquals("exactly-once", sinkBaseConfig.get("sink.semantic")); assertEquals("json", sinkBaseConfig.get("sink.properties.format")); - Map tableBaseConfig = starRocksCatalog.getTableBaseConfig().toMap(); + Map tableBaseConfig = flinkCatalog.getTableBaseConfig().toMap(); assertEquals(2, tableBaseConfig.size()); assertEquals("10", tableBaseConfig.get("table.num-buckets")); assertEquals("1", tableBaseConfig.get("table.properties.replication_num")); diff --git a/src/test/java/com/starrocks/connector/flink/it/StarRocksITTestBase.java b/src/test/java/com/starrocks/connector/flink/it/StarRocksITTestBase.java index f80a9059..03d22056 100644 --- a/src/test/java/com/starrocks/connector/flink/it/StarRocksITTestBase.java +++ b/src/test/java/com/starrocks/connector/flink/it/StarRocksITTestBase.java @@ -52,6 +52,8 @@ public abstract class StarRocksITTestBase { protected static String DB_NAME; protected static String HTTP_URLS; protected static String JDBC_URLS; + protected static String USERNAME; + protected static String PASSWORD; protected static String getHttpUrls() { return HTTP_URLS; @@ -69,11 +71,13 @@ protected static String getJdbcUrl() { public static void setUp() throws Exception { HTTP_URLS = DEBUG_MODE ? "127.0.0.1:11901" : System.getProperty("http_urls"); JDBC_URLS = DEBUG_MODE ? "jdbc:mysql://127.0.0.1:11903" : System.getProperty("jdbc_urls"); + USERNAME = DEBUG_MODE ? "root" : System.getProperty("username"); + PASSWORD = DEBUG_MODE ? "" : System.getProperty("password"); assumeTrue(HTTP_URLS != null && JDBC_URLS != null); DB_NAME = "sr_test_" + genRandomUuid(); try { - DB_CONNECTION = DriverManager.getConnection(getJdbcUrl(), "root", ""); + DB_CONNECTION = DriverManager.getConnection(getJdbcUrl(), USERNAME, PASSWORD); LOG.info("Success to create db connection via jdbc {}", getJdbcUrl()); } catch (Exception e) { LOG.error("Failed to create db connection via jdbc {}", getJdbcUrl(), e); diff --git a/src/test/java/com/starrocks/connector/flink/it/catalog/FlinkCatalogTest.java b/src/test/java/com/starrocks/connector/flink/it/catalog/FlinkCatalogTest.java new file mode 100644 index 00000000..a484034f --- /dev/null +++ b/src/test/java/com/starrocks/connector/flink/it/catalog/FlinkCatalogTest.java @@ -0,0 +1,386 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.it.catalog; + +import com.starrocks.connector.flink.catalog.FlinkCatalog; +import com.starrocks.connector.flink.it.StarRocksITTestBase; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.TableColumn; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; +import org.apache.flink.table.catalog.CatalogTableImpl; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.CollectionUtil; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +import static com.starrocks.connector.flink.catalog.TypeUtils.MAX_VARCHAR_SIZE; +import static com.starrocks.connector.flink.catalog.TypeUtils.STRING_SIZE; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class FlinkCatalogTest extends StarRocksITTestBase { + + @Test + public void testDatabase() throws Exception { + FlinkCatalog catalog = createCatalog("test_db_catalog"); + String db1 = "sr_test_" + genRandomUuid(); + String db2 = "sr_test_" + genRandomUuid(); + String db3 = "sr_test_" + genRandomUuid(); + createDatabase(db1); + createDatabase(db2); + + List expectDbList = listDatabase(); + expectDbList.sort(String::compareTo); + List actualDbList = catalog.listDatabases(); + actualDbList.sort(String::compareTo); + assertEquals(expectDbList, actualDbList); + + assertNotNull(catalog.getDatabase(db1)); + assertNotNull(catalog.getDatabase(db2)); + try { + catalog.getDatabase(db3); + fail("Should fail because the db does not exist"); + } catch (Exception e) { + assertTrue(e instanceof DatabaseNotExistException); + } + + assertTrue(catalog.databaseExists(db1)); + assertTrue(catalog.databaseExists(db2)); + assertFalse(catalog.databaseExists(db3)); + + DATABASE_SET_TO_CLEAN.add(db3); + catalog.createDatabase(db3, new CatalogDatabaseImpl(Collections.emptyMap(), null), false); + assertTrue(catalog.databaseExists(db3)); + // test ignoreIfExists is true for already existed db + catalog.createDatabase(db3, new CatalogDatabaseImpl(Collections.emptyMap(), null), true); + try { + catalog.createDatabase(db3, new CatalogDatabaseImpl(Collections.emptyMap(), null), false); + fail("Should fail because the db already exists"); + } catch (Exception e) { + assertTrue(e instanceof DatabaseAlreadyExistException); + } + } + + @Test + public void testListAndExistTable() throws Exception { + FlinkCatalog catalog = createCatalog("test_table_catalog"); + + String db = "sr_test_" + genRandomUuid(); + DATABASE_SET_TO_CLEAN.add(db); + createDatabase(db); + assertTrue(catalog.databaseExists(db)); + + String tbl1 = createAllTypesTable(db, "tbl1"); + String tbl2 = createAllTypesTable(db, "tbl2"); + String tbl3 = "tbl3_" + genRandomUuid(); + + List expectTables = Arrays.asList(tbl1, tbl2); + expectTables.sort(String::compareTo); + List actualTables = catalog.listTables(db); + actualTables.sort(String::compareTo); + assertEquals(expectTables, actualTables); + String dbNotExist = "sr_test_" + genRandomUuid(); + try { + catalog.listTables(dbNotExist); + fail("Should failed because the db doest not exist"); + } catch (Exception e) { + assertTrue(e instanceof DatabaseNotExistException); + } + + assertTrue(catalog.tableExists(new ObjectPath(db, tbl1))); + assertTrue(catalog.tableExists(new ObjectPath(db, tbl2))); + assertFalse(catalog.tableExists(new ObjectPath(db, tbl3))); + assertFalse(catalog.tableExists(new ObjectPath(dbNotExist, tbl1))); + } + + @Test + public void testGetTable() throws Exception { + FlinkCatalog catalog = createCatalog("test_table_catalog"); + + String db = "sr_test_" + genRandomUuid(); + DATABASE_SET_TO_CLEAN.add(db); + catalog.createDatabase(db, new CatalogDatabaseImpl(Collections.emptyMap(), null), true); + assertTrue(catalog.databaseExists(db)); + + String tbl = createAllTypesTable(db, "tbl"); + Schema flinkSchema = createAllTypesFlinkSchema().toSchema(); + + assertTrue(catalog.tableExists(new ObjectPath(db, tbl))); + CatalogBaseTable table = catalog.getTable(new ObjectPath(db, tbl)); + assertNotNull(table); + + Schema.Builder schemaBuilder = Schema.newBuilder(); + for (int i = 0; i < 11; i++) { + Schema.UnresolvedPhysicalColumn column = (Schema.UnresolvedPhysicalColumn) flinkSchema.getColumns().get(i); + schemaBuilder.column(column.getName(), column.getDataType()); + } + schemaBuilder.column("c11", DataTypes.VARCHAR(STRING_SIZE)); + for (int i = 12; i < 15; i++) { + Schema.UnresolvedPhysicalColumn column = (Schema.UnresolvedPhysicalColumn) flinkSchema.getColumns().get(i); + schemaBuilder.column(column.getName(), column.getDataType()); + } + schemaBuilder.primaryKey(flinkSchema.getPrimaryKey().get().getColumnNames()); + Schema expectFlinkSchema = schemaBuilder.build(); + Schema actualFlinkSchema = table.getUnresolvedSchema(); + + assertEquals(expectFlinkSchema.getColumns(), actualFlinkSchema.getColumns()); + assertTrue(actualFlinkSchema.getPrimaryKey().isPresent()); + assertEquals(expectFlinkSchema.getPrimaryKey().get().getColumnNames(), + actualFlinkSchema.getPrimaryKey().get().getColumnNames()); + + String tblNotExist = "tbl_" + genRandomUuid(); + try { + catalog.getTable(new ObjectPath(db, tblNotExist)); + fail("Should fail because the table does not exist"); + } catch (Exception e) { + assertTrue(e instanceof TableNotExistException); + } + } + + @Test + public void testCreateTable() throws Exception { + Configuration tableConf = new Configuration(); + tableConf.setString("table.properties.replication_num", "1"); + FlinkCatalog catalog = createCatalog("test_table_catalog", tableConf); + + String db = "sr_test_" + genRandomUuid(); + DATABASE_SET_TO_CLEAN.add(db); + catalog.createDatabase(db, new CatalogDatabaseImpl(Collections.emptyMap(), null), true); + assertTrue(catalog.databaseExists(db)); + String tbl = "tbl_" + genRandomUuid(); + ObjectPath objectPath = new ObjectPath(db, tbl); + TableSchema flinkSchema = createAllTypesFlinkSchema(); + CatalogBaseTable catalogTable = new CatalogTableImpl(flinkSchema, Collections.emptyMap(), null); + assertFalse(catalog.tableExists(objectPath)); + catalog.createTable(objectPath, catalogTable, false); + assertTrue(catalog.tableExists(objectPath)); + CatalogBaseTable actualCatalogTable = catalog.getTable(objectPath); + List tableColumns = new ArrayList<>(flinkSchema.getTableColumns()); + tableColumns.set(5, TableColumn.physical(tableColumns.get(5).getName(), DataTypes.VARCHAR(MAX_VARCHAR_SIZE))); + tableColumns.set(11, TableColumn.physical(tableColumns.get(11).getName(), DataTypes.VARCHAR(MAX_VARCHAR_SIZE))); + tableColumns.set(14, TableColumn.physical(tableColumns.get(14).getName(), DataTypes.VARCHAR(MAX_VARCHAR_SIZE))); + TableSchema.Builder schemaBuilder = new TableSchema.Builder(); + tableColumns.forEach(schemaBuilder::add); + schemaBuilder.primaryKey(flinkSchema.getPrimaryKey().get().getColumns().toArray(new String[0])); + Schema expectFlinkSchema = schemaBuilder.build().toSchema(); + Schema actualFlinkSchema = actualCatalogTable.getUnresolvedSchema(); + assertEquals(expectFlinkSchema.getColumns(), actualFlinkSchema.getColumns()); + assertTrue(actualFlinkSchema.getPrimaryKey().isPresent()); + assertEquals(expectFlinkSchema.getPrimaryKey().get().getColumnNames(), + actualFlinkSchema.getPrimaryKey().get().getColumnNames()); + + try { + catalog.createTable(objectPath, catalogTable, false); + fail("Should fail because the table exists"); + } catch (Exception e) { + assertTrue(e instanceof TableAlreadyExistException); + } + + String dbNotExist = "sr_test_" + genRandomUuid(); + ObjectPath objectPathNotExist = new ObjectPath(dbNotExist, tbl); + try { + catalog.createTable(objectPathNotExist, catalogTable, false); + fail("Should fail because the database does not exist"); + } catch (Exception e) { + assertTrue(e instanceof DatabaseNotExistException); + } + } + + @Test + public void testDropTable() throws Exception { + FlinkCatalog catalog = createCatalog("test_table_catalog"); + + String db = "sr_test_" + genRandomUuid(); + DATABASE_SET_TO_CLEAN.add(db); + catalog.createDatabase(db, new CatalogDatabaseImpl(Collections.emptyMap(), null), true); + assertTrue(catalog.databaseExists(db)); + String tbl = createAllTypesTable(db, "tbl"); + + ObjectPath objectPath = new ObjectPath(db, tbl); + assertTrue(catalog.tableExists(objectPath)); + catalog.dropTable(objectPath, false); + assertFalse(catalog.tableExists(objectPath)); + // test ignoreIfNotExists + catalog.dropTable(objectPath, true); + try { + catalog.dropTable(objectPath, false); + fail("Should fail because the table does not exist"); + } catch (Exception e) { + assertTrue(e instanceof TableNotExistException); + } + } + + private String createAllTypesTable(String database, String tablePrefix) throws Exception { + String tableName = tablePrefix + "_" + genRandomUuid(); + String createSql = String.format( + "CREATE TABLE `%s`.`%s` (" + + "c0 BOOLEAN," + + "c1 TINYINT," + + "c2 SMALLINT," + + "c3 INT," + + "c4 BIGINT," + + "c5 LARGEINT," + + "c6 FLOAT," + + "c7 DOUBLE," + + "c8 DECIMAL(23,5)," + + "c9 CHAR(7)," + + "c10 VARCHAR(109)," + + "c11 STRING," + + "c12 DATE," + + "c13 DATETIME," + + "c14 JSON" + + ") ENGINE = OLAP " + + "PRIMARY KEY(c0,c1,c2,c3) " + + "DISTRIBUTED BY HASH (c0) BUCKETS 8 " + + "PROPERTIES (" + + "\"replication_num\" = \"1\"" + + ")", + database, tableName); + executeSrSQL(createSql); + return tableName; + } + + private TableSchema createAllTypesFlinkSchema() { + return new TableSchema.Builder() + .field("c0", DataTypes.BOOLEAN().notNull()) + .field("c1", DataTypes.TINYINT().notNull()) + .field("c2", DataTypes.SMALLINT().notNull()) + .field("c3", DataTypes.INT().notNull()) + .field("c4", DataTypes.BIGINT()) + .field("c5", DataTypes.STRING()) + .field("c6", DataTypes.FLOAT()) + .field("c7", DataTypes.DOUBLE()) + .field("c8", DataTypes.DECIMAL(23, 5)) + .field("c9", DataTypes.CHAR(7)) + .field("c10", DataTypes.VARCHAR(109)) + .field("c11", DataTypes.STRING()) + .field("c12", DataTypes.DATE()) + .field("c13", DataTypes.TIMESTAMP(0)) + .field("c14", DataTypes.STRING()) + .primaryKey("c0", "c1", "c2", "c3") + .build(); + } + + private FlinkCatalog createCatalog(String catalogName) { + return createCatalog(catalogName, new Configuration()); + } + + private FlinkCatalog createCatalog(String catalogName, Configuration tableConf) { + return new FlinkCatalog( + catalogName, + getJdbcUrl(), + getHttpUrls(), + "root", + "", + DB_NAME, + new Configuration(), + new Configuration(), + tableConf, + Thread.currentThread().getContextClassLoader() + ); + } + + @Test + public void testInsertAndSelect() throws Exception { + String tableName = createPkTable("test_read_write"); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + env.setParallelism(1); + + String catalogName = "test_catalog"; + String createCatalogSql = String.format( + "CREATE CATALOG %s WITH (" + + " 'type' = 'starrocks',\n" + + " 'jdbc-url' = '%s',\n" + + " 'http-url' = '%s',\n" + + " 'username' = '%s',\n" + + " 'password' = '%s',\n" + + " 'default-database' = '%s'" + + ");", + catalogName, getJdbcUrl(), getHttpUrls(), "root", "", "default-db"); + tEnv.executeSql(createCatalogSql); + tEnv.executeSql(String.format("USE CATALOG %s;", catalogName)); + tEnv.executeSql(String.format("USE %s;", DB_NAME)); + tEnv.executeSql( + String.format( + "INSERT INTO %s VALUES (1, '100'), (2, '200');", + tableName + )) + .await(); + + List results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery( + String.format( + "SELECT * FROM %s", + tableName)) + .execute() + .collect()); + results.sort(Comparator.comparingInt(row -> (int) row.getField(0))); + + List expectRows = Arrays.asList( + Row.ofKind(RowKind.INSERT, 1, "100"), + Row.ofKind(RowKind.INSERT, 2, "200") + ); + + assertThat(results).isEqualTo(expectRows); + } + + private String createPkTable(String tablePrefix) throws Exception { + String tableName = tablePrefix + "_" + genRandomUuid(); + String createStarRocksTable = + String.format( + "CREATE TABLE `%s`.`%s` (" + + "c0 INT," + + "c1 STRING" + + ") ENGINE = OLAP " + + "PRIMARY KEY(c0) " + + "DISTRIBUTED BY HASH (c0) BUCKETS 8 " + + "PROPERTIES (" + + "\"replication_num\" = \"1\"" + + ")", + DB_NAME, tableName); + executeSrSQL(createStarRocksTable); + return tableName; + } +} diff --git a/src/test/java/com/starrocks/connector/flink/it/catalog/StarRocksCatalogTest.java b/src/test/java/com/starrocks/connector/flink/it/catalog/StarRocksCatalogTest.java index 9dcfb5c3..a4b57173 100644 --- a/src/test/java/com/starrocks/connector/flink/it/catalog/StarRocksCatalogTest.java +++ b/src/test/java/com/starrocks/connector/flink/it/catalog/StarRocksCatalogTest.java @@ -21,358 +21,37 @@ package com.starrocks.connector.flink.it.catalog; import com.starrocks.connector.flink.catalog.StarRocksCatalog; +import com.starrocks.connector.flink.catalog.StarRocksColumn; +import com.starrocks.connector.flink.catalog.StarRocksTable; import com.starrocks.connector.flink.it.StarRocksITTestBase; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.api.Schema; -import org.apache.flink.table.api.TableColumn; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; -import org.apache.flink.table.catalog.CatalogBaseTable; -import org.apache.flink.table.catalog.CatalogDatabaseImpl; -import org.apache.flink.table.catalog.CatalogTableImpl; -import org.apache.flink.table.catalog.ObjectPath; -import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; -import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; -import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; -import org.apache.flink.table.catalog.exceptions.TableNotExistException; -import org.apache.flink.types.Row; -import org.apache.flink.types.RowKind; -import org.apache.flink.util.CollectionUtil; +import org.junit.Before; import org.junit.Test; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; import java.util.List; +import java.util.stream.Collectors; -import static com.starrocks.connector.flink.catalog.TypeUtils.MAX_VARCHAR_SIZE; -import static com.starrocks.connector.flink.catalog.TypeUtils.STRING_SIZE; -import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +/** Tests for {@link StarRocksCatalog}. */ public class StarRocksCatalogTest extends StarRocksITTestBase { - @Test - public void testDatabase() throws Exception { - StarRocksCatalog catalog = createCatalog("test_db_catalog"); - String db1 = "sr_test_" + genRandomUuid(); - String db2 = "sr_test_" + genRandomUuid(); - String db3 = "sr_test_" + genRandomUuid(); - createDatabase(db1); - createDatabase(db2); - - List expectDbList = listDatabase(); - expectDbList.sort(String::compareTo); - List actualDbList = catalog.listDatabases(); - actualDbList.sort(String::compareTo); - assertEquals(expectDbList, actualDbList); - - assertNotNull(catalog.getDatabase(db1)); - assertNotNull(catalog.getDatabase(db2)); - try { - catalog.getDatabase(db3); - fail("Should fail because the db does not exist"); - } catch (Exception e) { - assertTrue(e instanceof DatabaseNotExistException); - } - - assertTrue(catalog.databaseExists(db1)); - assertTrue(catalog.databaseExists(db2)); - assertFalse(catalog.databaseExists(db3)); - - DATABASE_SET_TO_CLEAN.add(db3); - catalog.createDatabase(db3, new CatalogDatabaseImpl(Collections.emptyMap(), null), false); - assertTrue(catalog.databaseExists(db3)); - // test ignoreIfExists is true for already existed db - catalog.createDatabase(db3, new CatalogDatabaseImpl(Collections.emptyMap(), null), true); - try { - catalog.createDatabase(db3, new CatalogDatabaseImpl(Collections.emptyMap(), null), false); - fail("Should fail because the db already exists"); - } catch (Exception e) { - assertTrue(e instanceof DatabaseAlreadyExistException); - } - } - - @Test - public void testListAndExistTable() throws Exception { - StarRocksCatalog catalog = createCatalog("test_table_catalog"); - - String db = "sr_test_" + genRandomUuid(); - DATABASE_SET_TO_CLEAN.add(db); - createDatabase(db); - assertTrue(catalog.databaseExists(db)); - - String tbl1 = createAllTypesTable(db, "tbl1"); - String tbl2 = createAllTypesTable(db, "tbl2"); - String tbl3 = "tbl3_" + genRandomUuid(); - - List expectTables = Arrays.asList(tbl1, tbl2); - expectTables.sort(String::compareTo); - List actualTables = catalog.listTables(db); - actualTables.sort(String::compareTo); - assertEquals(expectTables, actualTables); - String dbNotExist = "sr_test_" + genRandomUuid(); - try { - catalog.listTables(dbNotExist); - fail("Should failed because the db doest not exist"); - } catch (Exception e) { - assertTrue(e instanceof DatabaseNotExistException); - } - - assertTrue(catalog.tableExists(new ObjectPath(db, tbl1))); - assertTrue(catalog.tableExists(new ObjectPath(db, tbl2))); - assertFalse(catalog.tableExists(new ObjectPath(db, tbl3))); - assertFalse(catalog.tableExists(new ObjectPath(dbNotExist, tbl1))); - } - - @Test - public void testGetTable() throws Exception { - StarRocksCatalog catalog = createCatalog("test_table_catalog"); - - String db = "sr_test_" + genRandomUuid(); - DATABASE_SET_TO_CLEAN.add(db); - catalog.createDatabase(db, new CatalogDatabaseImpl(Collections.emptyMap(), null), true); - assertTrue(catalog.databaseExists(db)); - - String tbl = createAllTypesTable(db, "tbl"); - Schema flinkSchema = createAllTypesFlinkSchema().toSchema(); - - assertTrue(catalog.tableExists(new ObjectPath(db, tbl))); - CatalogBaseTable table = catalog.getTable(new ObjectPath(db, tbl)); - assertNotNull(table); - - Schema.Builder schemaBuilder = Schema.newBuilder(); - for (int i = 0; i < 11; i++) { - Schema.UnresolvedPhysicalColumn column = (Schema.UnresolvedPhysicalColumn) flinkSchema.getColumns().get(i); - schemaBuilder.column(column.getName(), column.getDataType()); - } - schemaBuilder.column("c11", DataTypes.VARCHAR(STRING_SIZE)); - for (int i = 12; i < 15; i++) { - Schema.UnresolvedPhysicalColumn column = (Schema.UnresolvedPhysicalColumn) flinkSchema.getColumns().get(i); - schemaBuilder.column(column.getName(), column.getDataType()); - } - schemaBuilder.primaryKey(flinkSchema.getPrimaryKey().get().getColumnNames()); - Schema expectFlinkSchema = schemaBuilder.build(); - Schema actualFlinkSchema = table.getUnresolvedSchema(); - - assertEquals(expectFlinkSchema.getColumns(), actualFlinkSchema.getColumns()); - assertTrue(actualFlinkSchema.getPrimaryKey().isPresent()); - assertEquals(expectFlinkSchema.getPrimaryKey().get().getColumnNames(), - actualFlinkSchema.getPrimaryKey().get().getColumnNames()); - - String tblNotExist = "tbl_" + genRandomUuid(); - try { - catalog.getTable(new ObjectPath(db, tblNotExist)); - fail("Should fail because the table does not exist"); - } catch (Exception e) { - assertTrue(e instanceof TableNotExistException); - } - } - - @Test - public void testCreateTable() throws Exception { - Configuration tableConf = new Configuration(); - tableConf.setString("table.properties.replication_num", "1"); - StarRocksCatalog catalog = createCatalog("test_table_catalog", tableConf); - - String db = "sr_test_" + genRandomUuid(); - DATABASE_SET_TO_CLEAN.add(db); - catalog.createDatabase(db, new CatalogDatabaseImpl(Collections.emptyMap(), null), true); - assertTrue(catalog.databaseExists(db)); - String tbl = "tbl_" + genRandomUuid(); - ObjectPath objectPath = new ObjectPath(db, tbl); - TableSchema flinkSchema = createAllTypesFlinkSchema(); - CatalogBaseTable catalogTable = new CatalogTableImpl(flinkSchema, Collections.emptyMap(), null); - assertFalse(catalog.tableExists(objectPath)); - catalog.createTable(objectPath, catalogTable, false); - assertTrue(catalog.tableExists(objectPath)); - CatalogBaseTable actualCatalogTable = catalog.getTable(objectPath); - List tableColumns = new ArrayList<>(flinkSchema.getTableColumns()); - tableColumns.set(5, TableColumn.physical(tableColumns.get(5).getName(), DataTypes.VARCHAR(MAX_VARCHAR_SIZE))); - tableColumns.set(11, TableColumn.physical(tableColumns.get(11).getName(), DataTypes.VARCHAR(MAX_VARCHAR_SIZE))); - tableColumns.set(14, TableColumn.physical(tableColumns.get(14).getName(), DataTypes.VARCHAR(MAX_VARCHAR_SIZE))); - TableSchema.Builder schemaBuilder = new TableSchema.Builder(); - tableColumns.forEach(schemaBuilder::add); - schemaBuilder.primaryKey(flinkSchema.getPrimaryKey().get().getColumns().toArray(new String[0])); - Schema expectFlinkSchema = schemaBuilder.build().toSchema(); - Schema actualFlinkSchema = actualCatalogTable.getUnresolvedSchema(); - assertEquals(expectFlinkSchema.getColumns(), actualFlinkSchema.getColumns()); - assertTrue(actualFlinkSchema.getPrimaryKey().isPresent()); - assertEquals(expectFlinkSchema.getPrimaryKey().get().getColumnNames(), - actualFlinkSchema.getPrimaryKey().get().getColumnNames()); - - try { - catalog.createTable(objectPath, catalogTable, false); - fail("Should fail because the table exists"); - } catch (Exception e) { - assertTrue(e instanceof TableAlreadyExistException); - } - - String dbNotExist = "sr_test_" + genRandomUuid(); - ObjectPath objectPathNotExist = new ObjectPath(dbNotExist, tbl); - try { - catalog.createTable(objectPathNotExist, catalogTable, false); - fail("Should fail because the database does not exist"); - } catch (Exception e) { - assertTrue(e instanceof DatabaseNotExistException); - } - } + private String tableName; + private StarRocksCatalog catalog; - @Test - public void testDropTable() throws Exception { - StarRocksCatalog catalog = createCatalog("test_table_catalog"); - - String db = "sr_test_" + genRandomUuid(); - DATABASE_SET_TO_CLEAN.add(db); - catalog.createDatabase(db, new CatalogDatabaseImpl(Collections.emptyMap(), null), true); - assertTrue(catalog.databaseExists(db)); - String tbl = createAllTypesTable(db, "tbl"); - - ObjectPath objectPath = new ObjectPath(db, tbl); - assertTrue(catalog.tableExists(objectPath)); - catalog.dropTable(objectPath, false); - assertFalse(catalog.tableExists(objectPath)); - // test ignoreIfNotExists - catalog.dropTable(objectPath, true); - try { - catalog.dropTable(objectPath, false); - fail("Should fail because the table does not exist"); - } catch (Exception e) { - assertTrue(e instanceof TableNotExistException); - } - } - - private String createAllTypesTable(String database, String tablePrefix) throws Exception { - String tableName = tablePrefix + "_" + genRandomUuid(); - String createSql = String.format( - "CREATE TABLE `%s`.`%s` (" + - "c0 BOOLEAN," + - "c1 TINYINT," + - "c2 SMALLINT," + - "c3 INT," + - "c4 BIGINT," + - "c5 LARGEINT," + - "c6 FLOAT," + - "c7 DOUBLE," + - "c8 DECIMAL(23,5)," + - "c9 CHAR(7)," + - "c10 VARCHAR(109)," + - "c11 STRING," + - "c12 DATE," + - "c13 DATETIME," + - "c14 JSON" + - ") ENGINE = OLAP " + - "PRIMARY KEY(c0,c1,c2,c3) " + - "DISTRIBUTED BY HASH (c0) BUCKETS 8 " + - "PROPERTIES (" + - "\"replication_num\" = \"1\"" + - ")", - database, tableName); - executeSrSQL(createSql); - return tableName; - } - - private TableSchema createAllTypesFlinkSchema() { - return new TableSchema.Builder() - .field("c0", DataTypes.BOOLEAN().notNull()) - .field("c1", DataTypes.TINYINT().notNull()) - .field("c2", DataTypes.SMALLINT().notNull()) - .field("c3", DataTypes.INT().notNull()) - .field("c4", DataTypes.BIGINT()) - .field("c5", DataTypes.STRING()) - .field("c6", DataTypes.FLOAT()) - .field("c7", DataTypes.DOUBLE()) - .field("c8", DataTypes.DECIMAL(23, 5)) - .field("c9", DataTypes.CHAR(7)) - .field("c10", DataTypes.VARCHAR(109)) - .field("c11", DataTypes.STRING()) - .field("c12", DataTypes.DATE()) - .field("c13", DataTypes.TIMESTAMP(0)) - .field("c14", DataTypes.STRING()) - .primaryKey("c0", "c1", "c2", "c3") - .build(); - } - - private StarRocksCatalog createCatalog(String catalogName) { - return createCatalog(catalogName, new Configuration()); - } - - private StarRocksCatalog createCatalog(String catalogName, Configuration tableConf) { - return new StarRocksCatalog( - catalogName, - getJdbcUrl(), - getHttpUrls(), - "root", - "", - DB_NAME, - new Configuration(), - new Configuration(), - tableConf, - Thread.currentThread().getContextClassLoader() - ); - } - - @Test - public void testInsertAndSelect() throws Exception { - String tableName = createPkTable("test_read_write"); - - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); - env.setParallelism(1); - - String catalogName = "test_catalog"; - String createCatalogSql = String.format( - "CREATE CATALOG %s WITH (" + - " 'type' = 'starrocks',\n" + - " 'jdbc-url' = '%s',\n" + - " 'http-url' = '%s',\n" + - " 'username' = '%s',\n" + - " 'password' = '%s',\n" + - " 'default-database' = '%s'" + - ");", - catalogName, getJdbcUrl(), getHttpUrls(), "root", "", "default-db"); - tEnv.executeSql(createCatalogSql); - tEnv.executeSql(String.format("USE CATALOG %s;", catalogName)); - tEnv.executeSql(String.format("USE %s;", DB_NAME)); - tEnv.executeSql( - String.format( - "INSERT INTO %s VALUES (1, '100'), (2, '200');", - tableName - )) - .await(); - - List results = - CollectionUtil.iteratorToList( - tEnv.sqlQuery( - String.format( - "SELECT * FROM %s", - tableName)) - .execute() - .collect()); - results.sort(Comparator.comparingInt(row -> (int) row.getField(0))); - - List expectRows = Arrays.asList( - Row.ofKind(RowKind.INSERT, 1, "100"), - Row.ofKind(RowKind.INSERT, 2, "200") - ); - - assertThat(results).isEqualTo(expectRows); - } - - private String createPkTable(String tablePrefix) throws Exception { - String tableName = tablePrefix + "_" + genRandomUuid(); + @Before + public void prepare() throws Exception { + this.tableName = "test_catalog_" + genRandomUuid(); + this.catalog = new StarRocksCatalog(getJdbcUrl(), USERNAME, PASSWORD); String createStarRocksTable = String.format( "CREATE TABLE `%s`.`%s` (" + "c0 INT," + - "c1 STRING" + + "c1 FLOAT," + + "c2 BOOLEAN," + + "c3 DATE" + ") ENGINE = OLAP " + "PRIMARY KEY(c0) " + "DISTRIBUTED BY HASH (c0) BUCKETS 8 " + @@ -381,6 +60,71 @@ private String createPkTable(String tablePrefix) throws Exception { ")", DB_NAME, tableName); executeSrSQL(createStarRocksTable); - return tableName; + } + + @Test + public void testAlterAddColumns() throws Exception { + StarRocksTable oldTable = catalog.getTable(DB_NAME, tableName).orElse(null); + assertNotNull(oldTable); + List addColumns = new ArrayList<>(); + + StarRocksColumn c4 = new StarRocksColumn.Builder() + .setColumnName("c4") + .setOrdinalPosition(4) + .setDataType("BIGINT") + .setNullable(true) + .setColumnSize(19) + .setDecimalDigits(0) + .setColumnComment("add c4") + .build(); + addColumns.add(c4); + + StarRocksColumn c5 = new StarRocksColumn.Builder() + .setColumnName("c5") + .setOrdinalPosition(5) + .setDataType("DECIMAL") + .setNullable(true) + .setColumnSize(20) + .setDecimalDigits(1) + .setColumnComment("add c5") + .build(); + addColumns.add(c5); + + StarRocksColumn c6 = new StarRocksColumn.Builder() + .setColumnName("c6") + .setOrdinalPosition(6) + .setDataType("DATETIME") + .setNullable(true) + .setDefaultValue(null) + .setColumnSize(null) + .setDecimalDigits(null) + .setColumnComment("add c6") + .build(); + addColumns.add(c6); + + catalog.alterAddColumns(DB_NAME, tableName, addColumns, 60); + StarRocksTable newTable = catalog.getTable(DB_NAME, tableName).orElse(null); + assertNotNull(newTable); + + List expectedColumns = new ArrayList<>(); + expectedColumns.addAll(oldTable.getColumns()); + expectedColumns.addAll(addColumns); + + assertEquals(expectedColumns, newTable.getColumns()); + } + + @Test + public void testAlterDropColumns() throws Exception { + StarRocksTable oldTable = catalog.getTable(DB_NAME, tableName).orElse(null); + assertNotNull(oldTable); + List dropColumns = Arrays.asList("c2", "c3"); + catalog.alterDropColumns(DB_NAME, tableName, dropColumns, 60); + StarRocksTable newTable = catalog.getTable(DB_NAME, tableName).orElse(null); + assertNotNull(newTable); + + List expectedColumns = oldTable.getColumns().stream() + .filter(column -> !dropColumns.contains(column.getColumnName())) + .collect(Collectors.toList()); + assertEquals(expectedColumns, newTable.getColumns()); } }