Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Connector-V2] Support dynamic write mode for JDBC Sink #8305

Open
wants to merge 6 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 29 additions & 8 deletions docs/en/connector-v2/sink/Jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ support `Xa transactions`. You can set `is_exactly_once=true` to enable it.

## Options

| Name | Type | Required | Default |
| Name | Type | Required | Default |
|-------------------------------------------|---------|----------|------------------------------|
| url | String | Yes | - |
| driver | String | Yes | - |
Expand Down Expand Up @@ -57,8 +57,11 @@ support `Xa transactions`. You can set `is_exactly_once=true` to enable it.
| data_save_mode | Enum | No | APPEND_DATA |
| custom_sql | String | No | - |
| enable_upsert | Boolean | No | true |
| use_copy_statement | Boolean | No | false |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need keep backward compatibility, please do not delete it.

| create_index | Boolean | No | true |
| write_mode | Enum | No | sql |
| temp_table_name | String | No | - |
| temp_column_batch_code | String | No | - |
| temp_column_row_kind | String | No | - |

### driver [string]

Expand Down Expand Up @@ -203,18 +206,36 @@ When data_save_mode selects CUSTOM_PROCESSING, you should fill in the CUSTOM_SQL

Enable upsert by primary_keys exist, If the task has no key duplicate data, setting this parameter to `false` can speed up data import

### use_copy_statement [boolean]

Use `COPY ${table} FROM STDIN` statement to import data. Only drivers with `getCopyAPI()` method connections are supported. e.g.: Postgresql driver `org.postgresql.Driver`.

NOTICE: `MAP`, `ARRAY`, `ROW` types are not supported.

### create_index [boolean]

Create the index(contains primary key and any other indexes) or not when auto-create table. You can use this option to improve the performance of jdbc writes when migrating large tables.

Notice: Note that this will sacrifice read performance, so you'll need to manually create indexes after the table migration to improve read performance

### write_mode [Enum]

The write modes support five modes: SQL, COPY, COPY_SQL, MERGE, COPY_MERGE.

- SQL (default): The traditional SQL mode using JDBC, supporting both full and incremental writes.
- COPY: Import data using the COPY command (requires DB support such as Postgres), only supports full writes.
- COPY_SQL: Import data using the COPY command (requires DB support), and dynamically switch to SQL mode for writing if there is incremental data.
- MERGE: Import into a temporary table using the COPY command (requires DB support), and then MERGE into the target table (requires DB support), supporting both full and incremental writes.
- COPY_MERGE: Use the COPY command to import full data into the target table (requires DB support); if there is incremental data, dynamically switch to using the COPY command to import into a temporary table, and then MERGE into the target table (requires DB support), supporting both full and incremental writes.

NOTICE: when use MERGE/COPY_MERGE write mode, it will create a temporary table with the same structure as the target table automatically.

### temp_table_name [String]

The temporary table name used in the MERGE/COPY_MERGE write mode. If not specified, the system will generate by origin table name with suffix `_tmp`.

### temp_column_batch_code [String]

The temporary column used to batch write data in the MERGE/COPY_MERGE write mode. If not specified, the system will default to `__st_batch_code` column.

### temp_column_row_kind [String]

The temporary column used to identify the type of data in the MERGE/COPY_MERGE write mode. If not specified, the system will default to `__st_row_kind` column.

## tips

In the case of is_exactly_once = "true", Xa transactions are used. This requires database support, and some databases require some setup :
Expand Down
30 changes: 25 additions & 5 deletions docs/zh/connector-v2/sink/Jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@
| data_save_mode | Enum | 否 | APPEND_DATA |
| custom_sql | String | 否 | - |
| enable_upsert | Boolean | 否 | true |
| use_copy_statement | Boolean | 否 | false |
| write_mode | Enum | 否 | sql |
| temp_table_name | String | 否 | - |
| temp_column_batch_code | String | 否 | - |
| temp_column_row_kind | String | 否 | - |


### driver [string]

Expand Down Expand Up @@ -197,12 +201,28 @@ Sink插件常用参数,请参考 [Sink常用选项](../sink-common-options.md)

启用通过主键更新插入,如果任务没有key重复数据,设置该参数为 false 可以加快数据导入速度

### use_copy_statement [boolean]
### write_mode [Enum]

写入模式支持五种模式:SQL, COPY, COPY_SQL, MERGE, COPY_MERGE
- SQL(默认): 传统使用JDBC的SQL模式,支持全量和增量的写入
- COPY: 使用COPY命令导入数据(需数据库支持如Postgres),仅支持全量写入
- COPY_SQL: 使用COPY命令导入数据(需数据库支持),如果有增量数据动态切换为SQL模式写入
- MERGE: 使用COPY命令导入临时表(需数据库支持),然后MERGE到目标表(需数据库支持),支持全量和增量的写入
- COPY_MERGE: 使用COPY命令将全量数据导入目标表(需数据库支持);如果有增量数据动态切换为使用COPY命令导入到临时表,然后MERGE到目标表(需数据库支持),支持全量和增量的写入

注意: 当使用 MERGE/COPY_MERGE 写入模式时,它将自动创建一个与目标表结构相同的临时表。

### temp_table_name [String]

在 MERGE/COPY_MERGE 写入模式中使用的临时表名称。如果未指定,系统将根据原始表名称生成,并添加后缀 `_tmp`。

### temp_column_batch_code [String]

在 MERGE/COPY_MERGE 写入模式中用于批量写入数据的临时列。如果未指定,系统将默认使用 `__st_batch_code` 作为列名。

使用 `COPY ${table} FROM STDIN` 语句导入数据。仅支持具有 `getCopyAPI()` 方法连接的驱动程序。例如:Postgresql
驱动程序 `org.postgresql.Driver`
### temp_column_row_kind [String]

注意:不支持 `MAP`、`ARRAY`、`ROW`类型
在 MERGE/COPY_MERGE 写入模式中用于识别数据类型的临时列。如果未指定,系统将默认使用 `__st_row_kind` 作为列名。

## tips

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ protected void dropTable() {
catalog.dropTable(tablePath, true);
}

protected void createTablePreCheck() {
public static void createTablePreCheck(
TablePath tablePath, Catalog catalog, CatalogTable catalogTable) {
if (!catalog.databaseExists(tablePath.getDatabaseName())) {
try {
log.info(
Expand All @@ -199,7 +200,7 @@ protected void createTablePreCheck() {
}

protected void createTable() {
createTablePreCheck();
createTablePreCheck(tablePath, catalog, catalogTable);
catalog.createTable(tablePath, catalogTable, true);
isNewTableCreated = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ public static final class Builder {
public String kerberosKeytabPath;
public String krb5Path = JdbcOptions.KRB5_PATH.defaultValue();

public String tempTableName;

private Builder() {}

public Builder url(String url) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,18 +152,39 @@ public interface JdbcOptions {
.defaultValue(true)
.withDescription(
"is the primary key updated when performing an update operation");

Option<JdbcSinkConfig.WriteMode> WRITE_MODE =
Options.key("write_mode")
.enumType(JdbcSinkConfig.WriteMode.class)
.defaultValue(JdbcSinkConfig.WriteMode.SQL)
.withDescription("write mode: SQL/COPY/COPY_SQL/MERGE/COPY_MERGE");

String REPLACE_TARGET_TABLE_NAME_KEY = "${target_table}";

Option<String> TEMP_TABLE_NAME =
Options.key("temp_table_name")
.stringType()
.defaultValue(REPLACE_TARGET_TABLE_NAME_KEY + "_tmp")
.withDescription("temp table name");

Option<String> TEMP_COLUMN_BATCH_CODE =
Options.key("temp_column_batch_code")
.stringType()
.defaultValue("__st_batch_code")
.withDescription("temp column batch code for merge");

Option<String> TEMP_COLUMN_ROW_KIND =
Options.key("temp_column_row_kind")
.stringType()
.defaultValue("__st_row_kind")
.withDescription("temp column row kind for merge");

Option<Boolean> SUPPORT_UPSERT_BY_INSERT_ONLY =
Options.key("support_upsert_by_insert_only")
.booleanType()
.defaultValue(false)
.withDescription("support upsert by insert only");

Option<Boolean> USE_COPY_STATEMENT =
Options.key("use_copy_statement")
.booleanType()
.defaultValue(false)
.withDescription("support copy in statement (postgresql)");

/** source config */
Option<String> PARTITION_COLUMN =
Options.key("partition_column")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;

import org.apache.commons.lang3.StringUtils;

import lombok.Builder;
import lombok.Data;

Expand All @@ -28,6 +30,7 @@

import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.ENABLE_UPSERT;
import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.IS_PRIMARY_KEY_UPDATED;
import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.REPLACE_TARGET_TABLE_NAME_KEY;
import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.SUPPORT_UPSERT_BY_INSERT_ONLY;

@Data
Expand All @@ -43,8 +46,11 @@ public class JdbcSinkConfig implements Serializable {
private List<String> primaryKeys;
private boolean enableUpsert;
@Builder.Default private boolean isPrimaryKeyUpdated = true;
private WriteMode writeMode;
private String tempTableName;
private String tempColumnRowKind;
private String tempColumnBatchCode;
private boolean supportUpsertByInsertOnly;
private boolean useCopyStatement;
@Builder.Default private boolean createIndex = true;

public static JdbcSinkConfig of(ReadonlyConfig config) {
Expand All @@ -58,8 +64,29 @@ public static JdbcSinkConfig of(ReadonlyConfig config) {
builder.isPrimaryKeyUpdated(config.get(IS_PRIMARY_KEY_UPDATED));
builder.supportUpsertByInsertOnly(config.get(SUPPORT_UPSERT_BY_INSERT_ONLY));
builder.simpleSql(config.get(JdbcOptions.QUERY));
builder.useCopyStatement(config.get(JdbcOptions.USE_COPY_STATEMENT));
builder.createIndex(config.get(JdbcCatalogOptions.CREATE_INDEX));
builder.writeMode(config.get(JdbcOptions.WRITE_MODE));
String tempTableName = config.get(JdbcOptions.TEMP_TABLE_NAME);
if (StringUtils.isNotBlank(tempTableName)
&& config.getOptional(JdbcOptions.TABLE).isPresent()) {
String tableName = config.get(JdbcOptions.TABLE);
int index = tableName.lastIndexOf(".");
if (index > -1) {
tableName = tableName.substring(index + 1);
}
tempTableName = tempTableName.replace(REPLACE_TARGET_TABLE_NAME_KEY, tableName);
}
builder.tempTableName(tempTableName);
builder.tempColumnBatchCode(config.get(JdbcOptions.TEMP_COLUMN_BATCH_CODE));
builder.tempColumnRowKind(config.get(JdbcOptions.TEMP_COLUMN_ROW_KIND));
return builder.build();
}

public enum WriteMode {
SQL,
COPY,
COPY_SQL,
MERGE,
COPY_MERGE,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.BufferReducedBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.BufferedBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.CopyManagerBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.DynamicBufferedBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.FieldNamedPreparedStatement;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.InsertOrUpdateBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor;
Expand Down Expand Up @@ -67,12 +67,11 @@ public JdbcOutputFormat build() {
jdbcSinkConfig.getDatabase() + "." + jdbcSinkConfig.getTable()));

final List<String> primaryKeys = jdbcSinkConfig.getPrimaryKeys();
if (jdbcSinkConfig.isUseCopyStatement()) {
statementExecutorFactory =
() ->
createCopyInBufferStatementExecutor(
createCopyInBatchStatementExecutor(
dialect, table, tableSchema));
if ((JdbcSinkConfig.WriteMode.COPY.equals(jdbcSinkConfig.getWriteMode())
|| JdbcSinkConfig.WriteMode.MERGE.equals(jdbcSinkConfig.getWriteMode())
|| JdbcSinkConfig.WriteMode.COPY_MERGE.equals(jdbcSinkConfig.getWriteMode())
|| JdbcSinkConfig.WriteMode.COPY_SQL.equals(jdbcSinkConfig.getWriteMode()))) {
statementExecutorFactory = this::createDynamicBufferedExecutor;
} else if (StringUtils.isNotBlank(jdbcSinkConfig.getSimpleSql())) {
statementExecutorFactory =
() ->
Expand Down Expand Up @@ -107,6 +106,42 @@ public JdbcOutputFormat build() {
statementExecutorFactory);
}

private JdbcBatchStatementExecutor<SeaTunnelRow> createDynamicBufferedExecutor() {
final String database = jdbcSinkConfig.getDatabase();
final TablePath tablePath =
TablePath.of(jdbcSinkConfig.getDatabase() + "." + jdbcSinkConfig.getTable());
final String table = dialect.extractTableName(tablePath);

JdbcBatchStatementExecutor<SeaTunnelRow> bufferReducedBatchStatementExecutor = null;
final List<String> primaryKeys = jdbcSinkConfig.getPrimaryKeys();
if (JdbcSinkConfig.WriteMode.COPY_SQL.equals(jdbcSinkConfig.getWriteMode())) {
if (primaryKeys == null || primaryKeys.isEmpty()) {
throw new RuntimeException(
"Primary key is not set, can not execute upsert operation");
}
bufferReducedBatchStatementExecutor =
createUpsertBufferedExecutor(
dialect,
database,
table,
tableSchema,
databaseTableSchema,
primaryKeys.toArray(new String[0]),
jdbcSinkConfig.isEnableUpsert(),
jdbcSinkConfig.isPrimaryKeyUpdated(),
jdbcSinkConfig.isSupportUpsertByInsertOnly());
}
final JdbcBatchStatementExecutor<SeaTunnelRow> finalBufferReducedBatchStatementExecutor =
bufferReducedBatchStatementExecutor;
return new DynamicBufferedBatchStatementExecutor(
tablePath,
tableSchema,
dialect,
jdbcSinkConfig,
finalBufferReducedBatchStatementExecutor,
Function.identity());
}

private static JdbcBatchStatementExecutor<SeaTunnelRow> createSimpleBufferedExecutor(
JdbcDialect dialect,
String database,
Expand Down Expand Up @@ -222,22 +257,6 @@ private static JdbcBatchStatementExecutor<SeaTunnelRow> createUpsertExecutor(
isPrimaryKeyUpdated);
}

private static JdbcBatchStatementExecutor<SeaTunnelRow> createCopyInBufferStatementExecutor(
CopyManagerBatchStatementExecutor copyManagerBatchStatementExecutor) {
return new BufferedBatchStatementExecutor(
copyManagerBatchStatementExecutor, Function.identity());
}

private static CopyManagerBatchStatementExecutor createCopyInBatchStatementExecutor(
JdbcDialect dialect, String table, TableSchema tableSchema) {
String columns =
Arrays.stream(tableSchema.getFieldNames())
.map(dialect::quoteIdentifier)
.collect(Collectors.joining(",", "(", ")"));
String copyInSql = String.format("COPY %s %s FROM STDIN WITH CSV", table, columns);
return new CopyManagerBatchStatementExecutor(copyInSql, tableSchema);
}

private static JdbcBatchStatementExecutor<SeaTunnelRow> createInsertOnlyExecutor(
JdbcDialect dialect,
String database,
Expand Down Expand Up @@ -352,7 +371,7 @@ private static JdbcBatchStatementExecutor<SeaTunnelRow> createSimpleExecutor(
rowConverter);
}

static Function<SeaTunnelRow, SeaTunnelRow> createKeyExtractor(int[] pkFields) {
public static Function<SeaTunnelRow, SeaTunnelRow> createKeyExtractor(int[] pkFields) {
return row -> {
Object[] fields = new Object[pkFields.length];
for (int i = 0; i < pkFields.length; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,16 @@ default String getRowExistsStatement(
Optional<String> getUpsertStatement(
String database, String tableName, String[] fieldNames, String[] uniqueKeyFields);

default Optional<String> getMergeStatement(
String sourceSQL,
String database,
String tableName,
String[] fieldNames,
String[] uniqueKeyFields,
boolean isPrimaryKeyUpdated) {
return Optional.empty();
}

/**
* Different dialects optimize their PreparedStatement
*
Expand Down
Loading
Loading