Skip to content

Commit

Permalink
[Feature] Support Flink catalog (#295)
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <[email protected]>
Co-authored-by: melin <[email protected]>
  • Loading branch information
banmoy and melin authored Oct 30, 2023
1 parent 74ce00e commit f55a5b8
Show file tree
Hide file tree
Showing 24 changed files with 2,614 additions and 227 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2021-present StarRocks, Inc. All rights reserved.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.starrocks.connector.flink.catalog;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.table.catalog.CommonCatalogOptions;

public class CatalogOptions {

public static final String IDENTIFIER = "starrocks";

public static ConfigOption<String> FE_JDBC_URL =
ConfigOptions.key("jdbc-url")
.stringType()
.noDefaultValue()
.withDescription("StarRocks JDBC url like: `jdbc:mysql://fe_ip1:query_port,fe_ip2:query_port...`.");

public static ConfigOption<String> FE_HTTP_URL =
ConfigOptions.key("http-url")
.stringType()
.noDefaultValue()
.withDescription("StarRocks FE http url like: `fe_ip1:http_port,http://fe_ip2:http_port`.");

public static final ConfigOption<String> USERNAME =
ConfigOptions.key("username")
.stringType()
.noDefaultValue()
.withDescription("StarRocks user name.");

public static final ConfigOption<String> PASSWORD =
ConfigOptions.key("password")
.stringType()
.noDefaultValue()
.withDescription("StarRocks user password.");

public static final ConfigOption<String> DEFAULT_DATABASE =
ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY)
.stringType()
.noDefaultValue()
.withDescription("The default database.");

// ------ options for create table ------

public static final String TABLE_PROPERTIES_PREFIX = "table.properties.";

public static final ConfigOption<Integer> TABLE_NUM_BUCKETS =
ConfigOptions.key("table.num-buckets")
.intType()
.noDefaultValue()
.withDescription("Number of buckets for creating StarRocks table.");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2021-present StarRocks, Inc. All rights reserved.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.starrocks.connector.flink.catalog;

import java.util.HashMap;
import java.util.Map;

public class ConfigUtils {

public static Map<String, String> getPrefixConfigs(
String prefix, Map<String, String> conf, boolean removePrefix) {
Map<String, String> result = new HashMap<>();
for (Map.Entry<String, String> entry : conf.entrySet()) {
String key = entry.getKey();
if (key.startsWith(prefix)) {
if (removePrefix) {
result.put(key.substring(prefix.length()), entry.getValue());
} else {
result.put(key, entry.getValue());
}
}
}
return result;
}
}
55 changes: 55 additions & 0 deletions src/main/java/com/starrocks/connector/flink/catalog/JdbcUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2021-present StarRocks, Inc. All rights reserved.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.starrocks.connector.flink.catalog;

import org.apache.flink.table.api.ValidationException;

public class JdbcUtils {

// Driver name for mysql connector 5.1 which is deprecated in 8.0
private static final String MYSQL_51_DRIVER_NAME = "com.mysql.jdbc.Driver";

// Driver name for mysql connector 8.0
private static final String MYSQL_80_DRIVER_NAME = "com.mysql.cj.jdbc.Driver";

private static final String MYSQL_SITE_URL = "https://dev.mysql.com/downloads/connector/j/";
private static final String MAVEN_CENTRAL_URL = "https://repo1.maven.org/maven2/mysql/mysql-connector-java/";

public static String getJdbcUrl(String host, int port) {
return String.format("jdbc:mysql://%s:%d", host, port);
}

public static void verifyJdbcDriver() {
try {
Class.forName(MYSQL_80_DRIVER_NAME);
} catch (ClassNotFoundException e) {
try {
Class.forName(MYSQL_51_DRIVER_NAME);
} catch (ClassNotFoundException ie) {
String msg = String.format("Can't find mysql jdbc driver, please download it and " +
"put it in your classpath manually. You can download it from MySQL " +
"site %s, or Maven Central %s",
MYSQL_SITE_URL, MAVEN_CENTRAL_URL);
throw new ValidationException(msg, ie);
}
}
}
}
Loading

0 comments on commit f55a5b8

Please sign in to comment.