Skip to content

Commit

Permalink
[Feature][Catalog] Catalog add Case Conversion Definition (#5328)
Browse files Browse the repository at this point in the history
  • Loading branch information
XiaoJiang521 authored Sep 12, 2023
1 parent de7b86a commit 7b5b28b
Show file tree
Hide file tree
Showing 8 changed files with 203 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import lombok.RequiredArgsConstructor;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

@Getter
@EqualsAndHashCode
Expand Down Expand Up @@ -54,28 +56,52 @@ public static TablePath of(String databaseName, String schemaName, String tableN
}

public String getSchemaAndTableName() {
return String.format("%s.%s", schemaName, tableName);
return getNameCommon(null, schemaName, tableName, null, null);
}

public String getSchemaAndTableName(String quote) {
return getNameCommon(null, schemaName, tableName, quote, quote);
}

public String getFullName() {
if (schemaName == null) {
return String.format("%s.%s", databaseName, tableName);
}
return String.format("%s.%s.%s", databaseName, schemaName, tableName);
return getNameCommon(databaseName, schemaName, tableName, null, null);
}

public String getFullNameWithQuoted() {
return getFullNameWithQuoted("`");
}

public String getFullNameWithQuoted(String quote) {
if (schemaName == null) {
return String.format(
"%s%s%s.%s%s%s", quote, databaseName, quote, quote, tableName, quote);
return getNameCommon(databaseName, schemaName, tableName, quote, quote);
}

public String getFullNameWithQuoted(String quoteLeft, String quoteRight) {
return getNameCommon(databaseName, schemaName, tableName, quoteLeft, quoteRight);
}

private String getNameCommon(
String databaseName,
String schemaName,
String tableName,
String quoteLeft,
String quoteRight) {
List<String> joinList = new ArrayList<>();
quoteLeft = quoteLeft == null ? "" : quoteLeft;
quoteRight = quoteRight == null ? "" : quoteRight;

if (databaseName != null) {
joinList.add(quoteLeft + databaseName + quoteRight);
}

if (schemaName != null) {
joinList.add(quoteLeft + schemaName + quoteRight);
}
return String.format(
"%s%s%s.%s%s%s.%s%s%s",
quote, databaseName, quote, quote, schemaName, quote, quote, tableName, quote);

if (tableName != null) {
joinList.add(quoteLeft + tableName + quoteRight);
}

return String.join(".", joinList);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -55,6 +56,8 @@ public class MysqlCreateTableSqlBuilder {

private MysqlDataTypeConvertor mysqlDataTypeConvertor;

private String fieldIde;

private MysqlCreateTableSqlBuilder(String tableName) {
checkNotNull(tableName, "tableName must not be null");
this.tableName = tableName;
Expand All @@ -76,7 +79,8 @@ public static MysqlCreateTableSqlBuilder builder(
.charset(null)
.primaryKey(tableSchema.getPrimaryKey())
.constraintKeys(tableSchema.getConstraintKeys())
.addColumn(tableSchema.getColumns());
.addColumn(tableSchema.getColumns())
.fieldIde(catalogTable.getOptions().get("fieldIde"));
}

public MysqlCreateTableSqlBuilder addColumn(List<Column> columns) {
Expand All @@ -90,6 +94,11 @@ public MysqlCreateTableSqlBuilder primaryKey(PrimaryKey primaryKey) {
return this;
}

public MysqlCreateTableSqlBuilder fieldIde(String fieldIde) {
this.fieldIde = fieldIde;
return this;
}

public MysqlCreateTableSqlBuilder constraintKeys(List<ConstraintKey> constraintKeys) {
this.constraintKeys = constraintKeys;
return this;
Expand Down Expand Up @@ -120,7 +129,8 @@ public String build(String catalogName) {
sqls.add(
String.format(
"CREATE TABLE %s (\n%s\n)",
tableName, buildColumnsIdentifySql(catalogName)));
CatalogUtils.quoteIdentifier(tableName, fieldIde, "`"),
buildColumnsIdentifySql(catalogName)));
if (engine != null) {
sqls.add("ENGINE = " + engine);
}
Expand Down Expand Up @@ -157,7 +167,7 @@ private String buildColumnsIdentifySql(String catalogName) {

private String buildColumnIdentifySql(Column column, String catalogName) {
final List<String> columnSqls = new ArrayList<>();
columnSqls.add(column.getName());
columnSqls.add(CatalogUtils.quoteIdentifier(column.getName(), fieldIde, "`"));
if (StringUtils.equals(catalogName, "mysql")) {
columnSqls.add(column.getSourceType());
} else {
Expand Down Expand Up @@ -243,7 +253,7 @@ private String buildPrimaryKeySql() {
.map(columnName -> "`" + columnName + "`")
.collect(Collectors.joining(", "));
// add sort type
return String.format("PRIMARY KEY (%s)", key);
return String.format("PRIMARY KEY (%s)", CatalogUtils.quoteIdentifier(key, fieldIde));
}

private String buildConstraintKeySql(ConstraintKey constraintKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;

import org.apache.commons.lang3.StringUtils;

Expand All @@ -36,23 +37,27 @@ public class OracleCreateTableSqlBuilder {
private PrimaryKey primaryKey;
private OracleDataTypeConvertor oracleDataTypeConvertor;
private String sourceCatalogName;
private String fieldIde;

public OracleCreateTableSqlBuilder(CatalogTable catalogTable) {
this.columns = catalogTable.getTableSchema().getColumns();
this.primaryKey = catalogTable.getTableSchema().getPrimaryKey();
this.oracleDataTypeConvertor = new OracleDataTypeConvertor();
this.sourceCatalogName = catalogTable.getCatalogName();
this.fieldIde = catalogTable.getOptions().get("fieldIde");
}

public String build(TablePath tablePath) {
StringBuilder createTableSql = new StringBuilder();
createTableSql
.append("CREATE TABLE ")
.append(tablePath.getSchemaAndTableName())
.append(tablePath.getSchemaAndTableName("\""))
.append(" (\n");

List<String> columnSqls =
columns.stream().map(this::buildColumnSql).collect(Collectors.toList());
columns.stream()
.map(column -> CatalogUtils.getFieldIde(buildColumnSql(column), fieldIde))
.collect(Collectors.toList());

// Add primary key directly in the create table statement
if (primaryKey != null
Expand All @@ -70,7 +75,7 @@ public String build(TablePath tablePath) {
.map(
column ->
buildColumnCommentSql(
column, tablePath.getSchemaAndTableName()))
column, tablePath.getSchemaAndTableName("\"")))
.collect(Collectors.toList());

if (!commentSqls.isEmpty()) {
Expand All @@ -83,7 +88,7 @@ public String build(TablePath tablePath) {

private String buildColumnSql(Column column) {
StringBuilder columnSql = new StringBuilder();
columnSql.append(column.getName()).append(" ");
columnSql.append("\"").append(column.getName()).append("\" ");

String columnType =
sourceCatalogName.equals("oracle")
Expand All @@ -95,11 +100,6 @@ private String buildColumnSql(Column column) {
columnSql.append(" NOT NULL");
}

// if (column.getDefaultValue() != null) {
// columnSql.append(" DEFAULT
// '").append(column.getDefaultValue().toString()).append("'");
// }

return columnSql.toString();
}

Expand Down Expand Up @@ -140,29 +140,37 @@ private String buildColumnType(Column column) {

private String buildPrimaryKeySql(PrimaryKey primaryKey) {
String randomSuffix = UUID.randomUUID().toString().replace("-", "").substring(0, 4);
String columnNamesString = String.join(", ", primaryKey.getColumnNames());
String columnNamesString =
primaryKey.getColumnNames().stream()
.map(columnName -> "\"" + columnName + "\"")
.collect(Collectors.joining(", "));

// In Oracle database, the maximum length for an identifier is 30 characters.
String primaryKeyStr = primaryKey.getPrimaryKey();
if (primaryKeyStr.length() > 25) {
primaryKeyStr = primaryKeyStr.substring(0, 25);
}

return "CONSTRAINT "
+ primaryKeyStr
+ "_"
+ randomSuffix
+ " PRIMARY KEY ("
+ columnNamesString
+ ")";
return CatalogUtils.getFieldIde(
"CONSTRAINT "
+ primaryKeyStr
+ "_"
+ randomSuffix
+ " PRIMARY KEY ("
+ columnNamesString
+ ")",
fieldIde);
}

private String buildColumnCommentSql(Column column, String tableName) {
StringBuilder columnCommentSql = new StringBuilder();
columnCommentSql.append("COMMENT ON COLUMN ").append(tableName).append(".");
columnCommentSql
.append(column.getName())
.append(" IS '")
.append(CatalogUtils.quoteIdentifier("COMMENT ON COLUMN ", fieldIde))
.append(tableName)
.append(".");
columnCommentSql
.append(CatalogUtils.quoteIdentifier(column.getName(), fieldIde, "\""))
.append(CatalogUtils.quoteIdentifier(" IS '", fieldIde))
.append(column.getComment())
.append("'");
return columnCommentSql.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;

import org.apache.commons.lang3.StringUtils;

Expand All @@ -37,23 +38,30 @@ public class PostgresCreateTableSqlBuilder {
private PrimaryKey primaryKey;
private PostgresDataTypeConvertor postgresDataTypeConvertor;
private String sourceCatalogName;
private String fieldIde;

public PostgresCreateTableSqlBuilder(CatalogTable catalogTable) {
this.columns = catalogTable.getTableSchema().getColumns();
this.primaryKey = catalogTable.getTableSchema().getPrimaryKey();
this.postgresDataTypeConvertor = new PostgresDataTypeConvertor();
this.sourceCatalogName = catalogTable.getCatalogName();
this.fieldIde = catalogTable.getOptions().get("fieldIde");
}

public String build(TablePath tablePath) {
StringBuilder createTableSql = new StringBuilder();
createTableSql
.append("CREATE TABLE ")
.append(tablePath.getSchemaAndTableName())
.append(CatalogUtils.quoteIdentifier("CREATE TABLE ", fieldIde))
.append(tablePath.getSchemaAndTableName("\""))
.append(" (\n");

List<String> columnSqls =
columns.stream().map(this::buildColumnSql).collect(Collectors.toList());
columns.stream()
.map(
column ->
CatalogUtils.quoteIdentifier(
buildColumnSql(column), fieldIde))
.collect(Collectors.toList());

createTableSql.append(String.join(",\n", columnSqls));
createTableSql.append("\n);");
Expand All @@ -64,7 +72,7 @@ public String build(TablePath tablePath) {
.map(
columns ->
buildColumnCommentSql(
columns, tablePath.getSchemaAndTableName()))
columns, tablePath.getSchemaAndTableName("\"")))
.collect(Collectors.toList());

if (!commentSqls.isEmpty()) {
Expand All @@ -77,7 +85,7 @@ public String build(TablePath tablePath) {

private String buildColumnSql(Column column) {
StringBuilder columnSql = new StringBuilder();
columnSql.append(column.getName()).append(" ");
columnSql.append("\"").append(column.getName()).append("\" ");

// For simplicity, assume the column type in SeaTunnelDataType is the same as in PostgreSQL
String columnType =
Expand All @@ -96,12 +104,6 @@ private String buildColumnSql(Column column) {
columnSql.append(" PRIMARY KEY");
}

// Add default value if exists
// if (column.getDefaultValue() != null) {
// columnSql.append(" DEFAULT
// '").append(column.getDefaultValue().toString()).append("'");
// }

return columnSql.toString();
}

Expand Down Expand Up @@ -133,10 +135,13 @@ private String buildColumnType(Column column) {

private String buildColumnCommentSql(Column column, String tableName) {
StringBuilder columnCommentSql = new StringBuilder();
columnCommentSql.append("COMMENT ON COLUMN ").append(tableName).append(".");
columnCommentSql
.append(column.getName())
.append(" IS '")
.append(CatalogUtils.quoteIdentifier("COMMENT ON COLUMN ", fieldIde))
.append(tableName)
.append(".");
columnCommentSql
.append(CatalogUtils.quoteIdentifier(column.getName(), fieldIde, "\""))
.append(CatalogUtils.quoteIdentifier(" IS '", fieldIde))
.append(column.getComment())
.append("'");
return columnCommentSql.toString();
Expand Down
Loading

0 comments on commit 7b5b28b

Please sign in to comment.