Skip to content

Commit

Permalink
feat: Support lookup function
Browse files Browse the repository at this point in the history
Signed-off-by: redscarf <[email protected]>
  • Loading branch information
Jin-H committed Dec 8, 2023
1 parent bfa8c97 commit 22e6913
Show file tree
Hide file tree
Showing 19 changed files with 1,614 additions and 224 deletions.
35 changes: 22 additions & 13 deletions docs/content/connector-source.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,20 +92,29 @@ Follow these steps to deploy the Flink connector:

### Common parameters

| Parameter | Required | Data type | Description |
| --------------------------- | -------- | --------- | ------------------------------------------------------------ |
| connector | Yes | STRING | The type of connector that you want to use to read data. Set the value to `starrocks`. |
| scan-url | Yes | STRING | The address that is used to connect the FE from the web server. Format: `<fe_host>:<fe_http_port>`. The default port is `8030`. You can specify multiple addresses, which must be separated with a comma (,). Example: `192.168.xxx.xxx:8030,192.168.xxx.xxx:8030`. |
| jdbc-url | Yes | STRING | The address that is used to connect the MySQL client of the FE. Format: `jdbc:mysql://<fe_host>:<fe_query_port>`. The default port number is `9030`. |
| username | Yes | STRING | The username of your StarRocks cluster account. The account must have read permissions on the StarRocks table you want to read. See [User privileges](../administration/User_privilege.md). |
| password | Yes | STRING | The password of your StarRocks cluster account. |
| database-name | Yes | STRING | The name of the StarRocks database to which the StarRocks table you want to read belongs. |
| table-name | Yes | STRING | The name of the StarRocks table you want to read. |
| scan.connect.timeout-ms | No | STRING | The maximum amount of time after which the connection from the Flink connector to your StarRocks cluster times out. Unit: milliseconds. Default value: `1000`. If the amount of time taken to establish the connection exceeds this limit, the read task fails. |
| Parameter | Required | Data type | Description |
| --------------------------- | -------- |-----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| connector | Yes | STRING | The type of connector that you want to use to read data. Set the value to `starrocks`. |
| scan-url | Yes | STRING | The address that is used to connect the FE from the web server. Format: `<fe_host>:<fe_http_port>`. The default port is `8030`. You can specify multiple addresses, which must be separated with a comma (,). Example: `192.168.xxx.xxx:8030,192.168.xxx.xxx:8030`. |
| jdbc-url | Yes | STRING | The address that is used to connect the MySQL client of the FE. Format: `jdbc:mysql://<fe_host>:<fe_query_port>`. The default port number is `9030`. |
| username | Yes | STRING | The username of your StarRocks cluster account. The account must have read permissions on the StarRocks table you want to read. See [User privileges](../administration/User_privilege.md). |
| password | Yes | STRING | The password of your StarRocks cluster account. |
| database-name | Yes | STRING | The name of the StarRocks database to which the StarRocks table you want to read belongs. |
| table-name | Yes | STRING | The name of the StarRocks table you want to read. |
| scan.connect.timeout-ms | No | STRING | The maximum amount of time after which the connection from the Flink connector to your StarRocks cluster times out. Unit: milliseconds. Default value: `1000`. If the amount of time taken to establish the connection exceeds this limit, the read task fails. |
| scan.params.keep-alive-min | No | STRING | The maximum amount of time during which the read task keeps alive. The keep-alive time is checked on a regular basis by using a polling mechanism. Unit: minutes. Default value: `10`. We recommend that you set this parameter to a value that is greater than or equal to `5`. |
| scan.params.query-timeout-s | No | STRING | The maximum amount of time after which the read task times out. The timeout duration is checked during task execution. Unit: seconds. Default value: `600`. If no read result is returned after the time duration elapses, the read task stops. |
| scan.params.mem-limit-byte | No | STRING | The maximum amount of memory allowed per query on each BE. Unit: bytes. Default value: `1073741824`, equal to 1 GB. |
| scan.max-retries | No | STRING | The maximum number of times that the read task can be retried upon failures. Default value: `1`. If the number of times that the read task is retried exceeds this limit, the read task returns errors. |
| scan.params.query-timeout-s | No | STRING | The maximum amount of time after which the read task times out. The timeout duration is checked during task execution. Unit: seconds. Default value: `600`. If no read result is returned after the time duration elapses, the read task stops. |
| scan.params.mem-limit-byte | No | STRING | The maximum amount of memory allowed per query on each BE. Unit: bytes. Default value: `1073741824`, equal to 1 GB. |
| scan.max-retries | No | STRING | The maximum number of times that the read task can be retried upon failures. Default value: `1`. If the number of times that the read task is retried exceeds this limit, the read task returns errors. |
| lookup.cache | No | ENUM | The cache strategy for the lookup table. Currently supports NONE (no caching) and PARTIAL (caching entries on lookup operation in external database). |
| lookup.max-retries | No | INTEGER | The max retry times if lookup database failed. Default value is 3 |
| lookup.partial-cache.expire-after-access | No | Duration | The max time to live for each rows in lookup cache after accessing the entry in the cache."lookup.cache" must be set to "PARTIAL" to use this option. |
| lookup.partial-cache.expire-after-write | No | Duration | The max time to live for each rows in lookup cache after writing into the cache. "lookup.cache" must be set to "PARTIAL" to use this option. |
| lookup.partial-cache.max-rows | No | LONG | The maximum number of rows to store in the cache |
| lookup.partial-cache.cache-missing-key | No | BOOLEAN | Whether to store an empty value into the cache if the lookup key doesn't match any rows in the table |
The following parameters apply only to reading data by using DataStream API.
Expand Down
3 changes: 3 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,9 @@ limitations under the License.
</goals>
</execution>
</executions>
<configuration>
<source>${maven.compiler.source}</source>
</configuration>
</plugin>
<plugin>
<groupId>org.sonatype.plugins</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
package com.starrocks.connector.flink.connection;

import org.apache.flink.annotation.Internal;

import java.sql.Connection;
import java.sql.SQLException;

/**
* connection provider.
Expand All @@ -27,6 +29,17 @@ public interface StarRocksJdbcConnectionIProvider {

Connection reestablishConnection() throws Exception;

boolean isConnectionValid() throws SQLException;

/**
* Get existing connection or establish an new one if there is none.
*
* @return existing connection or newly established connection
* @throws SQLException sql exception
* @throws ClassNotFoundException driver class not found
*/
Connection getOrEstablishConnection() throws SQLException, ClassNotFoundException;

void close();

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,30 @@ public Connection getConnection() throws SQLException, ClassNotFoundException {
@Override
public Connection reestablishConnection() throws SQLException, ClassNotFoundException {
close();
connection = getConnection();
return getOrEstablishConnection();
}

public boolean isConnectionValid() throws SQLException {
return connection != null && connection.isValid(60);
}

@Override
public Connection getOrEstablishConnection() throws SQLException, ClassNotFoundException {
if (isConnectionValid() && !connection.isClosed() ) {
return connection;
}
try {
Class.forName(jdbcOptions.getCjDriverName());
} catch (ClassNotFoundException ex) {
LOG.warn("can not found class {}, try class {}", jdbcOptions.getCjDriverName(), jdbcOptions.getDriverName());
Class.forName(jdbcOptions.getDriverName());
}
if (jdbcOptions.getUsername().isPresent()) {
connection = DriverManager.getConnection(jdbcOptions.getDbURL(), jdbcOptions.getUsername().get(),
jdbcOptions.getPassword().orElse(null));
} else {
connection = DriverManager.getConnection(jdbcOptions.getDbURL());
}
return connection;
}

Expand Down
Loading

0 comments on commit 22e6913

Please sign in to comment.