Skip to content

Commit

Permalink
[Improve][JdbcSource] Optimize catalog-table metadata merge logic (#5828
Browse files Browse the repository at this point in the history
)
  • Loading branch information
hailin0 authored Nov 11, 2023
1 parent f69f773 commit 7d8028a
Show file tree
Hide file tree
Showing 2 changed files with 359 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,64 +173,58 @@ private static CatalogTable getCatalogTable(
return jdbcCatalog.getTable(tableConfig.getQuery());
}

private static CatalogTable mergeCatalogTable(
CatalogTable tableOfPath, CatalogTable tableOfQuery) {
String catalogName =
tableOfQuery.getTableId() == null
? DEFAULT_CATALOG_NAME
: tableOfQuery.getTableId().getCatalogName();
TableIdentifier tableIdentifier =
TableIdentifier.of(
catalogName,
tableOfPath.getTableId().getDatabaseName(),
tableOfPath.getTableId().getSchemaName(),
tableOfPath.getTableId().getTableName());

static CatalogTable mergeCatalogTable(CatalogTable tableOfPath, CatalogTable tableOfQuery) {
TableSchema tableSchemaOfPath = tableOfPath.getTableSchema();
Map<String, Column> columnsOfPath =
tableSchemaOfPath.getColumns().stream()
.collect(Collectors.toMap(Column::getName, Function.identity()));
Set<String> columnKeysOfPath = columnsOfPath.keySet();
.collect(
Collectors.toMap(
Column::getName,
Function.identity(),
(o1, o2) -> o1,
LinkedHashMap::new));
TableSchema tableSchemaOfQuery = tableOfQuery.getTableSchema();
Map<String, Column> columnsOfQuery =
tableSchemaOfQuery.getColumns().stream()
.collect(Collectors.toMap(Column::getName, Function.identity()));
.collect(
Collectors.toMap(
Column::getName,
Function.identity(),
(o1, o2) -> o1,
LinkedHashMap::new));
Set<String> columnKeysOfQuery = columnsOfQuery.keySet();

if (columnKeysOfPath.equals(columnKeysOfQuery)) {
boolean schemaEquals =
columnKeysOfPath.stream()
.allMatch(
key ->
columnsOfPath
.get(key)
.getDataType()
.equals(columnsOfQuery.get(key).getDataType()));
if (schemaEquals) {
return CatalogTable.of(
tableIdentifier,
TableSchema.builder()
.primaryKey(tableSchemaOfPath.getPrimaryKey())
.constraintKey(tableSchemaOfPath.getConstraintKeys())
.columns(tableSchemaOfQuery.getColumns())
.build(),
tableOfPath.getOptions(),
tableOfPath.getPartitionKeys(),
tableOfPath.getComment(),
tableIdentifier.getCatalogName());
}
List<Column> columnsOfMerge =
tableSchemaOfQuery.getColumns().stream()
.filter(
column ->
columnsOfPath.containsKey(column.getName())
&& columnsOfPath
.get(column.getName())
.getDataType()
.equals(
columnsOfQuery
.get(column.getName())
.getDataType()))
.map(column -> columnsOfPath.get(column.getName()))
.collect(Collectors.toList());
boolean schemaIncludeAllColumns = columnsOfMerge.size() == columnKeysOfQuery.size();
boolean schemaEquals =
schemaIncludeAllColumns && columnsOfMerge.size() == columnsOfPath.size();
if (schemaEquals) {
return tableOfPath;
}

PrimaryKey primaryKeyOfPath = tableSchemaOfPath.getPrimaryKey();
List<ConstraintKey> constraintKeysOfPath = tableSchemaOfPath.getConstraintKeys();
List<String> partitionKeysOfPath = tableOfPath.getPartitionKeys();
PrimaryKey primaryKeyOfQuery = null;
List<ConstraintKey> constraintKeysOfQuery = new ArrayList<>();
List<String> partitionKeysOfQuery = new ArrayList<>();
PrimaryKey primaryKeyOfMerge = null;
List<ConstraintKey> constraintKeysOfMerge = new ArrayList<>();
List<String> partitionKeysOfMerge = new ArrayList<>();

if (primaryKeyOfPath != null
&& columnKeysOfQuery.containsAll(primaryKeyOfPath.getColumnNames())) {
primaryKeyOfQuery = primaryKeyOfPath;
primaryKeyOfMerge = primaryKeyOfPath;
}
if (constraintKeysOfPath != null) {
for (ConstraintKey constraintKey : constraintKeysOfPath) {
Expand All @@ -239,26 +233,47 @@ private static CatalogTable mergeCatalogTable(
.map(e -> e.getColumnName())
.collect(Collectors.toSet());
if (columnKeysOfQuery.containsAll(constraintKeyFields)) {
constraintKeysOfQuery.add(constraintKey);
constraintKeysOfMerge.add(constraintKey);
}
}
}
if (partitionKeysOfPath != null && columnKeysOfQuery.containsAll(partitionKeysOfPath)) {
partitionKeysOfQuery = partitionKeysOfPath;
partitionKeysOfMerge = partitionKeysOfPath;
}
if (schemaIncludeAllColumns) {
return CatalogTable.of(
tableOfPath.getTableId(),
TableSchema.builder()
.primaryKey(primaryKeyOfMerge)
.constraintKey(constraintKeysOfMerge)
.columns(columnsOfMerge)
.build(),
tableOfPath.getOptions(),
partitionKeysOfMerge,
tableOfPath.getComment());
}

String catalogName =
tableOfQuery.getTableId() == null
? DEFAULT_CATALOG_NAME
: tableOfQuery.getTableId().getCatalogName();
TableIdentifier tableIdentifier =
TableIdentifier.of(
catalogName,
tableOfPath.getTableId().getDatabaseName(),
tableOfPath.getTableId().getSchemaName(),
tableOfPath.getTableId().getTableName());
CatalogTable mergedCatalogTable =
CatalogTable.of(
tableIdentifier,
TableSchema.builder()
.primaryKey(primaryKeyOfQuery)
.constraintKey(constraintKeysOfQuery)
.primaryKey(primaryKeyOfMerge)
.constraintKey(constraintKeysOfMerge)
.columns(tableSchemaOfQuery.getColumns())
.build(),
tableOfPath.getOptions(),
partitionKeysOfQuery,
tableOfPath.getComment(),
tableIdentifier.getCatalogName());
partitionKeysOfMerge,
tableOfPath.getComment());

log.info("Merged catalog table of path {}", tableOfPath.getTableId().toTablePath());
return mergedCatalogTable;
Expand Down
Loading

0 comments on commit 7d8028a

Please sign in to comment.