Skip to content

Commit

Permalink
[feature] merge dev
Browse files Browse the repository at this point in the history
  • Loading branch information
XiaoJiang521 committed Sep 12, 2023
2 parents f7fa31a + 38b6d6e commit 9bd01d2
Show file tree
Hide file tree
Showing 43 changed files with 882 additions and 64 deletions.
7 changes: 7 additions & 0 deletions docs/en/connector-v2/sink/Jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ support `Xa transactions`. You can set `is_exactly_once=true` to enable it.
| max_commit_attempts | Int | No | 3 |
| transaction_timeout_sec | Int | No | -1 |
| auto_commit | Boolean | No | true |
| field_ide | String | No | - |
| common-options | | no | - |

### driver [string]
Expand Down Expand Up @@ -136,6 +137,12 @@ exactly-once semantics

Automatic transaction commit is enabled by default

### field_ide [String]

The field "field_ide" is used to identify whether the field needs to be converted to uppercase or lowercase when
synchronizing from the source to the sink. "ORIGINAL" indicates no conversion is needed, "UPPERCASE" indicates
conversion to uppercase, and "LOWERCASE" indicates conversion to lowercase.

### common options

Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details
Expand Down
10 changes: 6 additions & 4 deletions docs/en/connector-v2/sink/Mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ semantics (using XA transaction guarantee).
| max_commit_attempts | Int | No | 3 | The number of retries for transaction commit failures |
| transaction_timeout_sec | Int | No | -1 | The timeout after the transaction is opened, the default is -1 (never timeout). Note that setting the timeout may affect<br/>exactly-once semantics |
| auto_commit | Boolean | No | true | Automatic transaction commit is enabled by default |
| field_ide | String | No | - | Identify whether the field needs to be converted when synchronizing from the source to the sink. `ORIGINAL` indicates no conversion is needed;`UPPERCASE` indicates conversion to uppercase;`LOWERCASE` indicates conversion to lowercase. |
| common-options | | no | - | Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details |

### Tips
Expand Down Expand Up @@ -122,7 +123,7 @@ transform {
sink {
jdbc {
url = "jdbc:mysql://localhost:3306/test"
url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "123456"
Expand All @@ -140,7 +141,7 @@ sink {
```
sink {
jdbc {
url = "jdbc:mysql://localhost:3306/test"
url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "123456"
Expand All @@ -159,7 +160,7 @@ sink {
```
sink {
jdbc {
url = "jdbc:mysql://localhost:3306/test"
url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
max_retries = 0
Expand All @@ -181,7 +182,7 @@ sink {
```
sink {
jdbc {
url = "jdbc:mysql://localhost:3306/test"
url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "123456"
Expand All @@ -191,6 +192,7 @@ sink {
database = test
table = sink_table
primary_keys = ["id","name"]
field_ide = UPPERCASE
}
}
```
Expand Down
2 changes: 2 additions & 0 deletions docs/en/connector-v2/sink/PostgreSql.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ semantics (using XA transaction guarantee).
| max_commit_attempts | Int | No | 3 | The number of retries for transaction commit failures |
| transaction_timeout_sec | Int | No | -1 | The timeout after the transaction is opened, the default is -1 (never timeout). Note that setting the timeout may affect<br/>exactly-once semantics |
| auto_commit | Boolean | No | true | Automatic transaction commit is enabled by default |
| field_ide | String | No | - | Identify whether the field needs to be converted when synchronizing from the source to the sink. `ORIGINAL` indicates no conversion is needed;`UPPERCASE` indicates conversion to uppercase;`LOWERCASE` indicates conversion to lowercase. |
| common-options | | no | - | Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details |

### Tips
Expand Down Expand Up @@ -197,6 +198,7 @@ sink {
database = test
table = sink_table
primary_keys = ["id","name"]
field_ide = UPPERCASE
}
}
```
Expand Down
2 changes: 1 addition & 1 deletion docs/en/connector-v2/source/Clickhouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ The following example demonstrates how to create a data synchronization job that
```bash
# Set the basic configuration of the task to be performed
env {
execution.parallelism = 1
execution.parallelism = 10
job.mode = "BATCH"
}

Expand Down
28 changes: 19 additions & 9 deletions docs/en/connector-v2/source/Jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,25 @@ Jdbc {
parallel:

```
Jdbc {
url = "jdbc:mysql://localhost/test?serverTimezone=GMT%2b8"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "root"
password = "123456"
query = "select * from type_bin"
partition_column = "id"
partition_num = 10
env {
execution.parallelism = 10
job.mode = "BATCH"
}
source {
Jdbc {
url = "jdbc:mysql://localhost/test?serverTimezone=GMT%2b8"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "root"
password = "123456"
query = "select * from type_bin"
partition_column = "id"
partition_num = 10
}
}
sink {
Console {}
}
```

Expand Down
7 changes: 7 additions & 0 deletions docs/en/connector-v2/source/MongoDB.md
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,10 @@ By utilizing `flat.sync-string`, only one field attribute value can be set, and
This operation will perform a string mapping on a single MongoDB data entry.
```bash
env {
execution.parallelism = 10
job.mode = "BATCH"
}
source {
MongoDB {
uri = "mongodb://user:[email protected]:27017"
Expand All @@ -296,6 +300,9 @@ source {
}
}
}
sink {
Console {}
}
```
Use the data samples synchronized with modified parameters, such as the following:
Expand Down
13 changes: 10 additions & 3 deletions docs/en/connector-v2/source/Mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ env {
}
source{
Jdbc {
url = "jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2b8"
url = "jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2b8&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "root"
Expand All @@ -118,9 +118,13 @@ sink {
> Read your query table in parallel with the shard field you configured and the shard data You can do this if you want to read the whole table
```
env {
execution.parallelism = 10
job.mode = "BATCH"
}
source {
Jdbc {
url = "jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2b8"
url = "jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2b8&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "root"
Expand All @@ -133,6 +137,9 @@ source {
partition_num = 10
}
}
sink {
Console {}
}
```

### Parallel Boundary:
Expand All @@ -142,7 +149,7 @@ source {
```
source {
Jdbc {
url = "jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2b8"
url = "jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2b8&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "root"
Expand Down
7 changes: 7 additions & 0 deletions docs/en/connector-v2/source/OceanBase.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ sink {
> Read your query table in parallel with the shard field you configured and the shard data. You can do this if you want to read the whole table
```
env {
execution.parallelism = 10
job.mode = "BATCH"
}
source {
Jdbc {
driver = "com.oceanbase.jdbc.Driver"
Expand All @@ -141,6 +145,9 @@ source {
partition_num = 10
}
}
sink {
Console {}
}
```

### Parallel Boundary:
Expand Down
7 changes: 7 additions & 0 deletions docs/en/connector-v2/source/Oracle.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ sink {
> Read your query table in parallel with the shard field you configured and the shard data You can do this if you want to read the whole table
```
env {
execution.parallelism = 10
job.mode = "BATCH"
}
source {
Jdbc {
url = "jdbc:oracle:thin:@datasource01:1523:xe"
Expand All @@ -126,6 +130,9 @@ source {
partition_num = 10
}
}
sink {
Console {}
}
```

### Parallel Boundary:
Expand Down
7 changes: 7 additions & 0 deletions docs/en/connector-v2/source/PostgreSQL.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ sink {
> Read your query table in parallel with the shard field you configured and the shard data You can do this if you want to read the whole table
```
env {
execution.parallelism = 10
job.mode = "BATCH"
}
source{
jdbc{
url = "jdbc:postgresql://localhost:5432/test"
Expand All @@ -131,6 +135,9 @@ source{
partition_num = 5
}
}
sink {
Console {}
}
```

### Parallel Boundary:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportCoordinate;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
Expand Down Expand Up @@ -76,7 +77,7 @@

@NoArgsConstructor
public abstract class IncrementalSource<T, C extends SourceConfig>
implements SeaTunnelSource<T, SourceSplitBase, PendingSplitsState> {
implements SeaTunnelSource<T, SourceSplitBase, PendingSplitsState>, SupportCoordinate {

protected ReadonlyConfig readonlyConfig;
protected SourceConfig.Factory<C> configFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

import java.io.File;
import java.net.MalformedURLException;
import java.util.List;
import java.util.Objects;

Expand All @@ -54,10 +56,11 @@ private HiveMetaStoreProxy(Config config) {
Configuration configuration = new Configuration();
FileSystemUtils.doKerberosAuthentication(configuration, principal, keytabPath);
}
if (config.hasPath(HiveConfig.HIVE_SITE_PATH.key())) {
hiveConf.addResource(config.getString(HiveConfig.HIVE_SITE_PATH.key()));
}
try {
if (config.hasPath(HiveConfig.HIVE_SITE_PATH.key())) {
String hiveSitePath = config.getString(HiveConfig.HIVE_SITE_PATH.key());
hiveConf.addResource(new File(hiveSitePath).toURI().toURL());
}
hiveMetaStoreClient = new HiveMetaStoreClient(hiveConf);
} catch (MetaException e) {
String errorMsg =
Expand All @@ -67,6 +70,14 @@ private HiveMetaStoreProxy(Config config) {
metastoreUri);
throw new HiveConnectorException(
HiveConnectorErrorCode.INITIALIZE_HIVE_METASTORE_CLIENT_FAILED, errorMsg, e);
} catch (MalformedURLException e) {
String errorMsg =
String.format(
"Using this hive uris [%s], hive conf [%s] to initialize "
+ "hive metastore client instance failed",
metastoreUri, config.getString(HiveConfig.HIVE_SITE_PATH.key()));
throw new HiveConnectorException(
HiveConnectorErrorCode.INITIALIZE_HIVE_METASTORE_CLIENT_FAILED, errorMsg, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,17 @@ private void addHeaders(HttpRequestBase request, Map<String, String> headers) {
headers.forEach(request::addHeader);
}

private boolean checkAlreadyHaveContentType(HttpEntityEnclosingRequestBase request) {
if (request.getEntity() != null && request.getEntity().getContentType() != null) {
return HTTP.CONTENT_TYPE.equals(request.getEntity().getContentType().getName());
}
return false;
}

private void addBody(HttpEntityEnclosingRequestBase request, String body) {
if (checkAlreadyHaveContentType(request)) {
return;
}
request.addHeader(HTTP.CONTENT_TYPE, APPLICATION_JSON);

if (StringUtils.isBlank(body)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;

import org.apache.commons.lang3.StringUtils;

import java.io.Serializable;
import java.sql.Connection;
Expand Down Expand Up @@ -68,9 +71,13 @@ default String hashModForField(String fieldName, int mod) {
default String quoteIdentifier(String identifier) {
return identifier;
}
/** Quotes the identifier for database name or field name */
default String quoteDatabaseIdentifier(String identifier) {
return identifier;
}

default String tableIdentifier(String database, String tableName) {
return quoteIdentifier(database) + "." + quoteIdentifier(tableName);
return quoteDatabaseIdentifier(database) + "." + quoteIdentifier(tableName);
}

/**
Expand Down Expand Up @@ -219,4 +226,18 @@ default ResultSetMetaData getResultSetMetaData(
default String extractTableName(TablePath tablePath) {
return tablePath.getSchemaAndTableName();
}

default String getFieldIde(String identifier, String fieldIde) {
if (StringUtils.isEmpty(fieldIde)) {
return identifier;
}
switch (FieldIdeEnum.valueOf(fieldIde.toUpperCase())) {
case LOWERCASE:
return identifier.toLowerCase();
case UPPERCASE:
return identifier.toUpperCase();
default:
return identifier;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public interface JdbcDialectFactory {
* @param compatibleMode The compatible mode
* @return a new instance of {@link JdbcDialect}
*/
default JdbcDialect create(String compatibleMode) {
default JdbcDialect create(String compatibleMode, String fieldId) {
return create();
}
}
Loading

0 comments on commit 9bd01d2

Please sign in to comment.