Skip to content

Commit

Permalink
Merge branch 'dev' into create-pulsar-sink
Browse files Browse the repository at this point in the history
  • Loading branch information
lightzhao committed Sep 12, 2023
2 parents de45934 + 5e37883 commit 8a08656
Show file tree
Hide file tree
Showing 69 changed files with 1,917 additions and 1,310 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,5 @@ test.conf
spark-warehouse
*.flattened-pom.xml

seatunnel-examples
seatunnel-examples
/lib/*
60 changes: 60 additions & 0 deletions bin/install-plugin.cmd
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
@echo off
REM Licensed to the Apache Software Foundation (ASF) under one or more
REM contributor license agreements. See the NOTICE file distributed with
REM this work for additional information regarding copyright ownership.
REM The ASF licenses this file to You under the Apache License, Version 2.0
REM (the "License"); you may not use this file except in compliance with
REM the License. You may obtain a copy of the License at
REM
REM http://www.apache.org/licenses/LICENSE-2.0
REM
REM Unless required by applicable law or agreed to in writing, software
REM distributed under the License is distributed on an "AS IS" BASIS,
REM WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
REM See the License for the specific language governing permissions and
REM limitations under the License.

REM This script is used to download the connector plug-ins required during the running process.
REM All are downloaded by default. You can also choose what you need.
REM You only need to configure the plug-in name in config\plugin_config.txt.

REM Get seatunnel home
set "SEATUNNEL_HOME=%~dp0..\"
echo Set SEATUNNEL_HOME to [%SEATUNNEL_HOME%]

REM Connector default version is 2.3.3, you can also choose a custom version. eg: 2.1.2: install-plugin.bat 2.1.2
set "version=2.3.3"
if not "%~1"=="" set "version=%~1"
echo Install hadoop shade jar, usage version is %version%

REM Create the lib directory
if not exist "%SEATUNNEL_HOME%\lib" (
mkdir "%SEATUNNEL_HOME%\lib"
echo create lib directory
)

call "%SEATUNNEL_HOME%\mvnw.cmd" dependency:get -DgroupId="org.apache.seatunnel" -Dclassifier="optional" -DartifactId="seatunnel-hadoop3-3.1.4-uber" -Dversion="%version%" -Ddest="%SEATUNNEL_HOME%\lib"

echo Install SeaTunnel connectors plugins, usage version is %version%

REM Create the connectors directory
if not exist "%SEATUNNEL_HOME%\connectors" (
mkdir "%SEATUNNEL_HOME%\connectors"
echo create connectors directory
)

REM Create the seatunnel connectors directory (for v2)
if not exist "%SEATUNNEL_HOME%\connectors\seatunnel" (
mkdir "%SEATUNNEL_HOME%\connectors\seatunnel"
echo create seatunnel connectors directory
)

for /f "usebackq delims=" %%a in ("%SEATUNNEL_HOME%\config\plugin_config") do (
set "line=%%a"
setlocal enabledelayedexpansion
if "!line:~0,1!" neq "-" if "!line:~0,1!" neq "#" (
echo install connector : !line!
call "%SEATUNNEL_HOME%\mvnw.cmd" dependency:get -DgroupId="org.apache.seatunnel" -DartifactId="!line!" -Dversion="%version%" -Ddest="%SEATUNNEL_HOME%\connectors\seatunnel"
)
endlocal
)
2 changes: 1 addition & 1 deletion config/plugin_config
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ connector-file-ftp
connector-file-hadoop
connector-file-local
connector-file-oss
connector-file-oss-jindo
connector-file-jindo-oss
connector-file-s3
connector-file-sftp
connector-google-sheets
Expand Down
21 changes: 21 additions & 0 deletions config/seatunnel-env.cmd
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
@echo off
REM Licensed to the Apache Software Foundation (ASF) under one or more
REM contributor license agreements. See the NOTICE file distributed with
REM this work for additional information regarding copyright ownership.
REM The ASF licenses this file to You under the Apache License, Version 2.0
REM (the "License"); you may not use this file except in compliance with
REM the License. You may obtain a copy of the License at
REM
REM http://www.apache.org/licenses/LICENSE-2.0
REM
REM Unless required by applicable law or agreed to in writing, software
REM distributed under the License is distributed on an "AS IS" BASIS,
REM WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
REM See the License for the specific language governing permissions and
REM limitations under the License.

REM Home directory of spark distribution.
if "%SPARK_HOME%" == "" set "SPARK_HOME=C:\Program Files\spark"

REM Home directory of flink distribution.
if "%FLINK_HOME%" == "" set "FLINK_HOME=C:\Program Files\flink"
118 changes: 82 additions & 36 deletions docs/en/connector-v2/sink/IoTDB.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ There is a conflict of thrift version between IoTDB and Spark.Therefore, you nee

| Name | Type | Required | Default | Description |
|-----------------------------|---------|----------|--------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| node_urls | Array | Yes | - | `IoTDB` cluster address, the format is `["host:port", ...]` |
| node_urls | String | Yes | - | `IoTDB` cluster address, the format is `"host1:port"` or `"host1:port,host2:port"` |
| username | String | Yes | - | `IoTDB` user username |
| password | String | Yes | - | `IoTDB` user password |
| key_device | String | No | - | Specify field name of the `IoTDB` deviceId in SeaTunnelRow |
| key_device | String | Yes | - | Specify field name of the `IoTDB` deviceId in SeaTunnelRow |
| key_timestamp | String | No | processing time | Specify field-name of the `IoTDB` timestamp in SeaTunnelRow. If not specified, use processing-time as timestamp |
| key_measurement_fields | Array | No | exclude `device` & `timestamp` | Specify field-name of the `IoTDB` measurement list in SeaTunnelRow. If not specified, include all fields but exclude `device` & `timestamp` |
| storage_group | Array | No | - | Specify device storage group(path prefix) <br/> example: deviceId = ${storage_group} + "." + ${key_device} |
Expand All @@ -68,78 +68,124 @@ There is a conflict of thrift version between IoTDB and Spark.Therefore, you nee
| connection_timeout_in_ms | Integer | No | - | The maximum time (in ms) to wait when connecting to `IoTDB` |
| common-options | | no | - | Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details |

## Task Example
## Examples

```hocon
env {
execution.parallelism = 2
job.mode = "BATCH"
}
source {
FakeSource {
row.num = 16
bigint.template = [1664035200001]
schema = {
fields {
device_name = "string"
temperature = "float"
moisture = "int"
event_ts = "bigint"
c_string = "string"
c_boolean = "boolean"
c_tinyint = "tinyint"
c_smallint = "smallint"
c_int = "int"
c_bigint = "bigint"
c_float = "float"
c_double = "double"
}
}
}
}
...
```

Upstream SeaTunnelRow data format is the following:

| device_name | temperature | moisture | event_ts | c_string | c_boolean | c_tinyint | c_smallint | c_int | c_bigint | c_float | c_double |
|--------------------------|-------------|----------|---------------|----------|-----------|-----------|------------|-------|------------|---------|----------|
| root.test_group.device_a | 36.1 | 100 | 1664035200001 | abc1 | true | 1 | 1 | 1 | 2147483648 | 1.0 | 1.0 |
| root.test_group.device_b | 36.2 | 101 | 1664035200001 | abc2 | false | 2 | 2 | 2 | 2147483649 | 2.0 | 2.0 |
| root.test_group.device_c | 36.3 | 102 | 1664035200001 | abc3 | false | 3 | 3 | 3 | 2147483649 | 3.0 | 3.0 |

### Case1

Common options:
only fill required config.
use current processing time as timestamp. and include all fields but exclude `device` & `timestamp` as measurement fields

```hocon
sink {
IoTDB {
node_urls = ["localhost:6667"]
node_urls = "localhost:6667"
username = "root"
password = "root"
batch_size = 1024
key_device = "device_name" # specify the `deviceId` use device_name field
}
}
```

When you assign `key_device` is `device_name`, for example:
Output to `IoTDB` data format is the following:

```shell
IoTDB> SELECT * FROM root.test_group.* align by device;
+------------------------+------------------------+--------------+-----------+--------------+---------+----------+----------+-----------+------+-----------+--------+---------+
| Time| Device| temperature| moisture| event_ts| c_string| c_boolean| c_tinyint| c_smallint| c_int| c_bigint| c_float| c_double|
+------------------------+------------------------+--------------+-----------+--------------+---------+----------+----------+-----------+------+-----------+--------+---------+
|2023-09-01T00:00:00.001Z|root.test_group.device_a| 36.1| 100| 1664035200001| abc1| true| 1| 1| 1| 2147483648| 1.0| 1.0|
|2023-09-01T00:00:00.001Z|root.test_group.device_b| 36.2| 101| 1664035200001| abc2| false| 2| 2| 2| 2147483649| 2.0| 2.0|
|2023-09-01T00:00:00.001Z|root.test_group.device_c| 36.3| 102| 1664035200001| abc2| false| 3| 3| 3| 2147483649| 3.0| 3.0|
+------------------------+------------------------+--------------+-----------+--------------+---------+---------+-----------+-----------+------+-----------+--------+---------+
```

### Case2

use source event's time

```hocon
sink {
IoTDB {
...
key_device = "device_name"
node_urls = "localhost:6667"
username = "root"
password = "root"
key_device = "device_name" # specify the `deviceId` use device_name field
key_timestamp = "event_ts" # specify the `timestamp` use event_ts field
}
}
```

Upstream SeaTunnelRow data format is the following:

| device_name | field_1 | field_2 |
|--------------------------|---------|---------|
| root.test_group.device_a | 1001 | 1002 |
| root.test_group.device_b | 2001 | 2002 |
| root.test_group.device_c | 3001 | 3002 |

Output to `IoTDB` data format is the following:

```shell
IoTDB> SELECT * FROM root.test_group.* align by device;
+------------------------+------------------------+-----------+----------+
| Time| Device| field_1| field_2|
+------------------------+------------------------+----------+-----------+
|2022-09-26T17:50:01.201Z|root.test_group.device_a| 1001| 1002|
|2022-09-26T17:50:01.202Z|root.test_group.device_b| 2001| 2002|
|2022-09-26T17:50:01.203Z|root.test_group.device_c| 3001| 3002|
+------------------------+------------------------+----------+-----------+
+------------------------+------------------------+--------------+-----------+--------------+---------+----------+----------+-----------+------+-----------+--------+---------+
| Time| Device| temperature| moisture| event_ts| c_string| c_boolean| c_tinyint| c_smallint| c_int| c_bigint| c_float| c_double|
+------------------------+------------------------+--------------+-----------+--------------+---------+----------+----------+-----------+------+-----------+--------+---------+
|2022-09-25T00:00:00.001Z|root.test_group.device_a| 36.1| 100| 1664035200001| abc1| true| 1| 1| 1| 2147483648| 1.0| 1.0|
|2022-09-25T00:00:00.001Z|root.test_group.device_b| 36.2| 101| 1664035200001| abc2| false| 2| 2| 2| 2147483649| 2.0| 2.0|
|2022-09-25T00:00:00.001Z|root.test_group.device_c| 36.3| 102| 1664035200001| abc2| false| 3| 3| 3| 2147483649| 3.0| 3.0|
+------------------------+------------------------+--------------+-----------+--------------+---------+---------+-----------+-----------+------+-----------+--------+---------+
```

### Case2
### Case3

When you assign `key_device``key_timestamp``key_measurement_fields`, for example:
use source event's time and limit measurement fields

```hocon
sink {
IoTDB {
...
node_urls = "localhost:6667"
username = "root"
password = "root"
key_device = "device_name"
key_timestamp = "ts"
key_timestamp = "event_ts"
key_measurement_fields = ["temperature", "moisture"]
}
}
```

Upstream SeaTunnelRow data format is the following:

| ts | device_name | field_1 | field_2 | temperature | moisture |
|---------------|--------------------------|---------|---------|-------------|----------|
| 1664035200001 | root.test_group.device_a | 1001 | 1002 | 36.1 | 100 |
| 1664035200001 | root.test_group.device_b | 2001 | 2002 | 36.2 | 101 |
| 1664035200001 | root.test_group.device_c | 3001 | 3002 | 36.3 | 102 |

Output to `IoTDB` data format is the following:

```shell
Expand Down
12 changes: 8 additions & 4 deletions docs/en/connector-v2/sink/Mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

> JDBC Mysql Sink Connector
## Support Mysql Version

- 5.5/5.6/5.7/8.0

## Support Those Engines

> Spark<br/>
Expand Down Expand Up @@ -118,7 +122,7 @@ transform {
sink {
jdbc {
url = "jdbc:mysql://localhost:3306/test"
url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "123456"
Expand All @@ -136,7 +140,7 @@ sink {
```
sink {
jdbc {
url = "jdbc:mysql://localhost:3306/test"
url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "123456"
Expand All @@ -155,7 +159,7 @@ sink {
```
sink {
jdbc {
url = "jdbc:mysql://localhost:3306/test"
url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
max_retries = 0
Expand All @@ -177,7 +181,7 @@ sink {
```
sink {
jdbc {
url = "jdbc:mysql://localhost:3306/test"
url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "123456"
Expand Down
2 changes: 1 addition & 1 deletion docs/en/connector-v2/source/Clickhouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ The following example demonstrates how to create a data synchronization job that
```bash
# Set the basic configuration of the task to be performed
env {
execution.parallelism = 1
execution.parallelism = 10
job.mode = "BATCH"
}

Expand Down
4 changes: 2 additions & 2 deletions docs/en/connector-v2/source/FtpFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ The target ftp host is required

The target ftp port is required

### username [string]
### user [string]

The target ftp username is required
The target ftp user name is required

### password [string]

Expand Down
Loading

0 comments on commit 8a08656

Please sign in to comment.