Skip to content

Commit

Permalink
[Refactor] Abstract StarRocksCatalog to be used in Flink CDC 3.0
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy committed Nov 27, 2023
1 parent 70f6685 commit d4733ff
Show file tree
Hide file tree
Showing 13 changed files with 1,915 additions and 1,272 deletions.
615 changes: 615 additions & 0 deletions src/main/java/com/starrocks/connector/flink/catalog/FlinkCatalog.java

Large diffs are not rendered by default.

935 changes: 442 additions & 493 deletions src/main/java/com/starrocks/connector/flink/catalog/StarRocksCatalog.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2021-present StarRocks, Inc. All rights reserved.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.starrocks.connector.flink.catalog;

/** A catalog-related, runtime exception. */
public class StarRocksCatalogException extends RuntimeException {

/** @param message the detail message. */
public StarRocksCatalogException(String message) {
super(message);
}

/** @param cause the cause. */
public StarRocksCatalogException(Throwable cause) {
super(cause);
}

/**
* @param message the detail message.
* @param cause the cause.
*/
public StarRocksCatalogException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public Catalog createCatalog(Context context) {
Configuration tableBaseConfig =
Configuration.fromMap(
getPrefixConfigs("table.", context.getOptions(), false));
return new StarRocksCatalog(
return new FlinkCatalog(
context.getName(),
helper.getOptions().get(CatalogOptions.FE_JDBC_URL),
helper.getOptions().get(CatalogOptions.FE_HTTP_URL),
Expand Down
204 changes: 129 additions & 75 deletions src/main/java/com/starrocks/connector/flink/catalog/StarRocksColumn.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,145 +20,199 @@

package com.starrocks.connector.flink.catalog;

import java.io.Serializable;
import javax.annotation.Nullable;

/** Describe a column of StarRocks table. */
import java.io.Serializable;
import java.util.Objects;
import java.util.Optional;

import static org.apache.flink.util.Preconditions.checkNotNull;

/** Describe a column of StarRocks table. These metas are from information_schema.COLUMNS. */
public class StarRocksColumn implements Serializable {

private static final long serialVersionUID = 1L;

private final String name;
// Index of column in table (starting at 0)
/** The name of the column. COLUMN_NAME in information_schema.COLUMNS. */
private final String columnName;

/**
* The position of the column within the table (starting at 0). ORDINAL_POSITION in
* information_schema.COLUMNS.
*/
private final int ordinalPosition;
private final String type;
@Nullable
private final String key;
@Nullable
private final Integer size;
@Nullable
private final Integer scale;
@Nullable
private final String defaultValue;

/** The column data type. DATA_TYPE in information_schema.COLUMNS. */
private final String dataType;

/** The column nullability.IS_NULLABLE in information_schema.COLUMNS. */
private final boolean isNullable;
@Nullable
private final String comment;

private StarRocksColumn(String name, int ordinalPosition, String type, @Nullable String key,
@Nullable Integer size, @Nullable Integer scale, @Nullable String defaultValue,
boolean isNullable, @Nullable String comment) {
this.name = name;
/** The default value for the column. COLUMN_DEFAULT in information_schema.COLUMNS. */
@Nullable private final String defaultValue;

/**
* The column size. COLUMN_SIZE in information_schema.COLUMNS. For numeric data, this is the
* maximum precision. For character data, this is the length in characters. For other data
* types, this is null.
*/
@Nullable private final Integer columnSize;

/**
* The number of fractional digits for numeric data. This is null for other data types.
* DECIMAL_DIGITS in information_schema.COLUMNS.
*/
@Nullable private final Integer decimalDigits;

/** The column comment. COLUMN_COMMENT in information_schema.COLUMNS. */
@Nullable private final String columnComment;


private StarRocksColumn(
String columnName,
int ordinalPosition,
String dataType,
boolean isNullable,
@Nullable String defaultValue,
@Nullable Integer columnSize,
@Nullable Integer decimalDigits,
@Nullable String columnComment) {
this.columnName = checkNotNull(columnName);
this.ordinalPosition = ordinalPosition;
this.type = type;
this.key = key;
this.size = size;
this.scale = scale;
this.defaultValue = defaultValue;
this.dataType = checkNotNull(dataType);
this.isNullable = isNullable;
this.comment = comment;
this.defaultValue = defaultValue;
this.columnSize = columnSize;
this.decimalDigits = decimalDigits;
this.columnComment = columnComment;
}

public String getName() {
return name;
public String getColumnName() {
return columnName;
}

public int getOrdinalPosition() {
return ordinalPosition;
}

public String getType() {
return type;
public String getDataType() {
return dataType;
}

public boolean isNullable() {
return isNullable;
}

@Nullable
public String getKey() {
return key;
public Optional<String> getDefaultValue() {
return Optional.ofNullable(defaultValue);
}

@Nullable
public Integer getSize() {
return size;
public Optional<Integer> getColumnSize() {
return Optional.ofNullable(columnSize);
}

@Nullable
public Integer getScale() {
return scale;
public Optional<Integer> getDecimalDigits() {
return Optional.ofNullable(decimalDigits);
}

@Nullable
public String getDefaultValue() {
return defaultValue;
public Optional<String> getColumnComment() {
return Optional.ofNullable(columnComment);
}

public boolean isNullable() {
return isNullable;
@Override
public String toString() {
return "StarRocksColumn{" +
"columnName='" + columnName + '\'' +
", ordinalPosition=" + ordinalPosition +
", dataType='" + dataType + '\'' +
", isNullable=" + isNullable +
", defaultValue='" + defaultValue + '\'' +
", columnSize=" + columnSize +
", decimalDigits=" + decimalDigits +
", columnComment='" + columnComment + '\'' +
'}';
}

@Nullable
public String getComment() {
return comment;
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
StarRocksColumn column = (StarRocksColumn) o;
return ordinalPosition == column.ordinalPosition && isNullable == column.isNullable &&
Objects.equals(columnName, column.columnName) &&
dataType.equalsIgnoreCase(column.dataType) &&
Objects.equals(defaultValue, column.defaultValue) &&
Objects.equals(columnSize, column.columnSize) &&
Objects.equals(decimalDigits, column.decimalDigits) &&
Objects.equals(columnComment, column.columnComment);
}

/** Build a {@link StarRocksColumn}. */
public static class Builder {

private String name;
private String columnName;
private int ordinalPosition;
private String type;
private String key;
private Integer size;
private Integer scale;
private String dataType;
private boolean isNullable = true;
private String defaultValue;
boolean isNullable;
private String comment;
private Integer columnSize;
private Integer decimalDigits;
private String columnComment;

public Builder setName(String name) {
this.name = name;
public StarRocksColumn.Builder setColumnName(String columnName) {
this.columnName = columnName;
return this;
}

public Builder setOrdinalPosition(int ordinalPosition) {
public StarRocksColumn.Builder setOrdinalPosition(int ordinalPosition) {
this.ordinalPosition = ordinalPosition;
return this;
}

public Builder setType(String type) {
this.type = type;
public StarRocksColumn.Builder setDataType(String dataType) {
this.dataType = dataType;
return this;
}

public Builder setKey(String key) {
this.key = key;
public StarRocksColumn.Builder setNullable(boolean isNullable) {
this.isNullable = isNullable;
return this;
}

public Builder setSize(Integer size) {
this.size = size;
return this;
}

public Builder setScale(Integer scale) {
this.scale = scale;
public StarRocksColumn.Builder setDefaultValue(String defaultValue) {
this.defaultValue = defaultValue;
return this;
}

public Builder setDefaultValue(String defaultValue) {
this.defaultValue = defaultValue;
public StarRocksColumn.Builder setColumnSize(Integer columnSize) {
this.columnSize = columnSize;
return this;
}

public Builder setNullable(boolean nullable) {
isNullable = nullable;
public StarRocksColumn.Builder setDecimalDigits(Integer decimalDigits) {
this.decimalDigits = decimalDigits;
return this;
}

public Builder setComment(String comment) {
this.comment = comment;
public StarRocksColumn.Builder setColumnComment(String columnComment) {
this.columnComment = columnComment;
return this;
}

public StarRocksColumn build() {
return new StarRocksColumn(name, ordinalPosition, type, key, size, scale,
defaultValue, isNullable, comment);
return new StarRocksColumn(
columnName,
ordinalPosition,
dataType,
isNullable,
defaultValue,
columnSize,
decimalDigits,
columnComment);
}
}
}
Loading

0 comments on commit d4733ff

Please sign in to comment.