diff --git a/docs/content/connector-sink.md b/docs/content/connector-sink.md index 8ff4c4a2..8ab305b1 100644 --- a/docs/content/connector-sink.md +++ b/docs/content/connector-sink.md @@ -1,41 +1,47 @@ # Continuously load data from Apache Flink® -StarRocks provides a self-developed connector named Flink Connector for Apache Flink® (Flink connector for short) to help you load data into a StarRocks table by using Flink. The basic principle is to accumulate the data and then load it all at a time into StarRocks through [STREAM LOAD](https://docs.starrocks.io/en-us/latest/sql-reference/sql-statements/data-manipulation/STREAM%20LOAD). -The Flink connector supports DataStream API, Table API & SQL, and Python API. The Flink connector also has a higher performance than [flink-connector-jdbc](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/jdbc/) provided by Apache Flink®. +StarRocks provides a self-developed connector named Flink connector for Apache Flink® (Flink connector for short) to help you load data into a StarRocks table by using Flink. The basic principle is that Flink connector accumulates data in memory and then load it all at a time into StarRocks through [STREAM LOAD](https://docs.starrocks.io/en-us/latest/sql-reference/sql-statements/data-manipulation/STREAM%20LOAD). + +The Flink connector supports DataStream API, Table API & SQL, and Python API. It has a higher and more stable performance than [flink-connector-jdbc](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/jdbc/) provided by Apache Flink®. + +> **NOTICE** +> +> Loading data into StarRocks tables with Flink connector needs SELECT and INSERT privileges. If you do not have these privileges, follow the instructions provided in [GRANT](https://docs.starrocks.io/en-us/latest/sql-reference/sql-statements/account-management/GRANT) to grant these privileges to the user that you use to connect to your StarRocks cluster. ## Version requirements | Connector | Flink | StarRocks | Java | Scala | |-----------|--------------------------|---------------| ---- |-----------| -| 1.2.7 | 1.11,1.12,1.13,1.14,1.15 | 2.1 and later | 8 | 2.11,2.12 | +| 1.2.7 | 1.11,1.12,1.13,1.14,1.15 | 2.1 and later| 8 | 2.11,2.12 | ## Obtain Flink connector You can obtain the Flink connector JAR file in the following ways: -- Directly download the compiled Flink Connector JAR file. +- Directly download the compiled Flink connector JAR file. - Add the Flink connector as a dependency in your Maven project and then download the JAR file. -- Compile the source code of the Flink Connector into a JAR file by yourself. +- Compile the source code of the Flink connector into a JAR file by yourself. -The naming format of the Flink connector JAR file +The naming format of the Flink connector JAR file is as follows: -* Since Flink 1.15, it's `flink-connector-starrocks-${connector_version}_flink-${flink_version}.jar`. For example, if you install Flink 1.15 and you want to use Flink connector 1.2.7, you can use `flink-connector-starrocks-1.2.7_flink-1.15.jar` +- Since Flink 1.15, it's `flink-connector-starrocks-${connector_version}_flink-${flink_version}.jar`. For example, if you install Flink 1.15 and you want to use Flink connector 1.2.7, you can use `flink-connector-starrocks-1.2.7_flink-1.15.jar`. -* Prior to Flink 1.15, it's `flink-connector-starrocks-${connector_version}_flink-${flink_version}_${scala_version}.jar`. For example, if you install Flink 1.14 and Scala 2.12 in your environment, and you want to use Flink connector 1.2.7, you can use `flink-connector-starrocks-1.2.7_flink-1.14_2.12.jar`. +- Prior to Flink 1.15, it's `flink-connector-starrocks-${connector_version}_flink-${flink_version}_${scala_version}.jar`. For example, if you install Flink 1.14 and Scala 2.12 in your environment, and you want to use Flink connector 1.2.7, you can use `flink-connector-starrocks-1.2.7_flink-1.14_2.12.jar`. > **NOTICE** > > In general, the latest version of the Flink connector only maintains compatibility with the three most recent versions of Flink. +### Download the compiled JAR file -### Download the compiled Jar file - -Directly download the corresponding version of the Flink connector JAR from the [Maven Central Repository](https://repo1.maven.org/maven2/com/starrocks). +Directly download the corresponding version of the Flink connector Jar file from the [Maven Central Repository](https://repo1.maven.org/maven2/com/starrocks). ### Maven Dependency In your Maven project's `pom.xml` file, add the Flink connector as a dependency according to the following format. Replace `flink_version`, `scala_version`, and `connector_version` with the respective versions. -* Since Flink 1.15 + +- In Flink 1.15 and later + ```xml com.starrocks @@ -43,7 +49,9 @@ In your Maven project's `pom.xml` file, add the Flink connector as a dependency ${connector_version}_flink-${flink_version} ``` -* Prior to Flink 1.15 + +- In versions earlier than Flink 1.15 + ```xml com.starrocks @@ -54,8 +62,8 @@ In your Maven project's `pom.xml` file, add the Flink connector as a dependency ### Compile by yourself -1. Download the [Flink connector package](https://github.com/StarRocks/starrocks-connector-for-apache-flink). -2. Execute the following command to compile the source code of Flink connector into a JAR file. Note that `flink_version` is replaced with the corresponding Flink version. +1. Download the [Flink connector source code](https://github.com/StarRocks/starrocks-connector-for-apache-flink). +2. Execute the following command to compile the source code of Flink connector into a JAR file. Note that `flink_version` is replaced with the corresponding Flink version. ```bash sh build.sh @@ -67,7 +75,7 @@ In your Maven project's `pom.xml` file, add the Flink connector as a dependency sh build.sh 1.15 ``` -3. Go to the `target/` directory to find the Flink connector JAR file, such as `flink-connector-starrocks-1.2.7_flink-1.15-SNAPSHOT.jar` , generated upon compilation. +3. Go to the `target/` directory to find the Flink connector JAR file, such as `flink-connector-starrocks-1.2.7_flink-1.15-SNAPSHOT.jar`, generated upon compilation. > **NOTE** > @@ -77,30 +85,29 @@ In your Maven project's `pom.xml` file, add the Flink connector as a dependency | **Option** | **Required** | **Default value** | **Description** | |-----------------------------------|--------------|-------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| connector | Yes | NONE | The connector that you want to use. The value must be "starrocks". | -| jdbc-url | Yes | NONE | The address that is used to connect to the MySQL server of the FE. You can specify multiple addresss, which must be separated by a comma (,). Format: `jdbc:mysql://:,:,:`. | -| load-url | Yes | NONE | The HTTP URL of the FE in your StarRocks cluster. You can specify multiple URLs, which must be separated by a semicolon (;). Format: `:;:`. | +| connector | Yes | NONE | The value must be "starrocks". | +| jdbc-url | Yes | NONE | The address that is used to connect to the MySQL server of the FE. You can specify multiple addresses, which must be separated by a comma (,). Format: `jdbc:mysql://:,:,:`. | +| load-url | Yes | NONE | The address that is used to connect to the HTTP server of the FE. You can specify multiple addresses, which must be separated by a semicolon (;). Format: `:;:`. | | database-name | Yes | NONE | The name of the StarRocks database into which you want to load data. | -| table-name | Yes | NONE | The name of the table that you want to use to load data into StarRocks. | -| username | Yes | NONE | The username of the account that you want to use to load data into StarRocks. | +| table-name | Yes | NONE | The name of the table that you want to use to load data into StarRocks. | +| username | Yes | NONE | The username of the account that you want to use to load data into StarRocks. The account needs [SELECT and INSERT privileges](https://docs.starrocks.io/en-us/latest/sql-reference/sql-statements/account-management/GRANT). | | password | Yes | NONE | The password of the preceding account. | -| sink.version | No | AUTO | Supported since 1.2.4. Choose which implementation to use.
  • `V1`: use [Stream Load](https://docs.starrocks.io/en-us/latest/loading/StreamLoad) interface to load data. Connectors before 1.2.4 only support this mode.
  • `V2`: use [Transaction Stream Load](https://docs.starrocks.io/en-us/latest/loading/Stream_Load_transaction_interface) interface to load data. It requires StarRocks to be at least version 2.4. Recommends `V2` because it optimizes the memory usage and provide a more stable exactly-once implementation.
  • `AUTO`: if the version of StarRocks supports transaction stream load, will choose `V2` automatically, otherwise choose `V1`
| +| sink.version | No | AUTO | the interface used to load data. This parameter is supported from Flink connector version 1.2.4 onwards.
  • `V1`: Use [Stream Load](https://docs.starrocks.io/en-us/latest/loading/StreamLoad) interface to load data. Connectors before 1.2.4 only support this mode.
  • `V2`: Use [Transaction Stream Load](https://docs.starrocks.io/en-us/latest/loading/Stream_Load_transaction_interface) interface to load data. It requires StarRocks to be at least version 2.4. Recommends `V2` because it optimizes the memory usage and provides a more stable exactly-once implementation.
  • `AUTO`: If the version of StarRocks supports transaction Stream Load, will choose `V2` automatically, otherwise choose `V1`
| | sink.label-prefix | No | NONE | The label prefix used by Stream Load. | -| sink.semantic | No | at-least-once | The semantics that is supported by your sink. Valid values: **at-least-once** and **exactly-once**. | -| sink.buffer-flush.max-bytes | No | 94371840(90M) | The maximum size of data that can be accumulated in memory before being sent to StarRocks at a time. Valid values: 64 MB to 10 GB. Setting this parameter to a larger value can improve loading performance but may increase loading latency. | -| sink.buffer-flush.max-rows | No | 500000 | The maximum number of rows that can be accumulated in memory before being sent to StarRocks at a time. Only available for version `V1` when using at-least-once. Valid values: 64000 to 5000000. | -| sink.buffer-flush.interval-ms | No | 300000 | The interval at which data is flushed. Only available for at-least-once mode. Valid values: 1000 to 3600000. Unit: ms. | -| sink.max-retries | No | 3 | The number of times that the system retries to perform the Stream Load. Only available for sink version `V1` currently. Valid values: 0 to 10. | +| sink.semantic | No | at-least-once | The semantic guaranteed by sink. Valid values: **at-least-once** and **exactly-once**. | +| sink.buffer-flush.max-bytes | No | 94371840(90M) | The maximum size of data that can be accumulated in memory before being sent to StarRocks at a time. The maximum value ranges from 64 MB to 10 GB. Setting this parameter to a larger value can improve loading performance but may increase loading latency. This parameter only takes effect when `sink.semantic` is set to `at-least-once`. If `sink.semantic` is set to `exactly-once`, the data in memory is flushed when a Flink checkpoint is triggered. In this circumstance, this parameter does not take effect. | +| sink.buffer-flush.max-rows | No | 500000 | The maximum number of rows that can be accumulated in memory before being sent to StarRocks at a time. This parameter is available only when `sink.version` is `V1` and `sink.semantic` is `at-least-once`. Valid values: 64000 to 5000000. | +| sink.buffer-flush.interval-ms | No | 300000 | The interval at which data is flushed. This parameter is available only when `sink.semantic` is `at-least-once`. Valid values: 1000 to 3600000. Unit: ms. | +| sink.max-retries | No | 3 | The number of times that the system retries to perform the Stream Load job. This parameter is available only when you set `sink.version` to `V1`. Valid values: 0 to 10. | | sink.connect.timeout-ms | No | 1000 | The timeout for establishing HTTP connection. Valid values: 100 to 60000. Unit: ms. | -| sink.wait-for-continue.timeout-ms | No | 10000 | Supported since 1.2.7. The timeout for waiting response of HTTP 100-continue from FE. Valid values: 3000 to 600000. Unit: ms | -| sink.ignore.update-before | No | true | Supported since version 1.2.8. Whether to ignore `UPDATE_BEFORE` records from Flink when loading data to primary key table. If false, will treat the record as a delete operation to StarRocks table. | -| sink.properties.* | No | NONE | The parameters that are used to control Stream Load behavior. For example, the parameter `sink.properties.format` specifies the format used for stream load, such as CSV or JSON. For a list of supported parameters and their descriptions, see [STREAM LOAD](https://docs.starrocks.io/en-us/latest/sql-reference/sql-statements/data-manipulation/STREAM%20LOAD). | -| sink.properties.format | No | csv | The format used for stream load. The connector will transform each batch of data to the format before sending them to StarRocks. Valid values: csv and json. | -| sink.properties.row_delimiter | No | \n | The row delimiter for CSV-formatted data. | +| sink.wait-for-continue.timeout-ms | No | 10000 | Supported since 1.2.7. The timeout for waiting response of HTTP 100-continue from the FE. Valid values: `3000` to `600000`. Unit: ms | +| sink.ignore.update-before | No | true | Supported since version 1.2.8. Whether to ignore `UPDATE_BEFORE` records from Flink when loading data to Primary Key tables. If this parameter is set to false, the record is treated as a delete operation to StarRocks table. | +| sink.parallelism | No | NONE | The parallelism of loading. Only available for Flink SQL. If this parameter is not specified, Flink planner decides the parallelism. **In the scenario of multi-parallelism, users need to guarantee data is written in the correct order.** | +| sink.properties.* | No | NONE | The parameters that control Stream Load behavior. For example, the parameter `sink.properties.format` specifies the format used for Stream Load, such as CSV or JSON. For a list of supported parameters and their descriptions, see [STREAM LOAD](https://docs.starrocks.io/en-us/latest/sql-reference/sql-statements/data-manipulation/STREAM%20LOAD). | +| sink.properties.format | No | csv | The format used for Stream Load. The Flink connector transforms each batch of data to the format before sending them to StarRocks. Valid values: `csv` and `json`. | | sink.properties.column_separator | No | \t | The column separator for CSV-formatted data. | -| sink.properties.max_filter_ratio | No | 0 | The maximum error tolerance of the stream load. It's the maximum percentage of data records that can be filtered out due to inadequate data quality. Valid values: 0 to 1. Default value: 0. See [Stream Load](https://docs.starrocks.io/en-us/latest/sql-reference/sql-statements/data-manipulation/STREAM%20LOAD) for details. | -| sink.parallelism | No | NONE | The parallelism of the connector. Only available for Flink SQL. If not set, Flink planner will decide the parallelism. In the scenario of multi-parallelism, users need to guarantee data is written in the correct order. | - +| sink.properties.row_delimiter | No | \n | The row delimiter for CSV-formatted data. | +| sink.properties.max_filter_ratio | No | 0 | The maximum error tolerance of the Stream Load. It's the maximum percentage of data records that can be filtered out due to inadequate data quality. Valid values: `0` to `1`. Default value: `0`. See [Stream Load](https://docs.starrocks.io/en-us/latest/sql-reference/sql-statements/data-manipulation/STREAM%20LOAD) for details. | ## Data type mapping between Flink and StarRocks @@ -127,24 +134,19 @@ In your Maven project's `pom.xml` file, add the Flink connector as a dependency ## Usage notes -When you load data from Apache Flink® into StarRocks, take note of the following points: - -- Loading data into StarRocks tables needs INSERT privilege. If you do not have the INSERT privilege, follow the instructions provided in [GRANT](https://docs.starrocks.io/en-us/latest/sql-reference/sql-statements/account-management/GRANT) to grant the INSERT privilege to the user that you use to connect to your StarRocks cluster. - - - Since v2.4, StarRocks provides a Stream Load transaction interface. Since Flink connector version 1.2.4, the Sink is redesigned to implement exactly-once semantics based on transactional interfaces. Compared to the previous implementation based on non-transactional interfaces, the new implementation reduces memory usage and checkpoint overhead, thereby enhancing real-time performance and stability of loading. Starting from Flink connector version 1.2.4, the Sink implements transactional interfaces by default. To enable non-transactional interfaces, the `sink.version` needs to be configured as `V1`. > **NOTICE** > > If the version of StarRocks is earlier than 2.4 or the version of Flink connector is earlier than 1.2.4, the sink automatically implements the non-transactional interface. - -- For the exactly-once semantics achieved by implementing non-transactional interface of Stream Load, it relies on Flink's checkpoint mechanism to save a small batch of data and its label at each checkpoint. In the first invocation after the completion of a checkpoint, it blocks to flush all data cached in the state, thus achieving precisely-once processing. However, if StarRocks unexpectedly exits, the operators for Apache Flink® sink streaming are blocked for a long time and Apache Flink® issues a monitoring alert or shuts down. + +- For the exactly-once semantics achieved by implementing non-transactional interface of Stream Load, it relies on Flink's checkpoint mechanism to save a small batch of data and its label at each checkpoint. In the first invocation after the completion of a checkpoint, it blocks to flush all data cached in the state, thus achieving precisely-once processing. However, if StarRocks unexpectedly exits, the operators for Apache Flink® sink streaming are blocked for a long time and Apache Flink® issues a monitoring alert or shuts down. - If data loading pauses, you can try to increase the memory of the Flink task. - If the preceding code runs as expected and StarRocks can receive data, but the data loading fails, check whether your machine can access the HTTP port of the backends (BEs) in your StarRocks cluster. If you can successfully ping the HTTP port returned by the execution of the SHOW BACKENDS command in your StarRocks cluster, your machine can access the HTTP port of the BEs in your StarRocks cluster. For example, a machine has a public IP address and a private IP address, the HTTP ports of frontends (FEs) and BEs can be accessed through the public IP address of the FEs and BEs, the IP address that is bounded with your StarRocks cluster is the private IP address, and the value of `loadurl` for the Flink task is the HTTP port of the public IP address of the FEs. The FEs forwards the data loading task to the private IP address of the BEs. In this example, if the machine cannot ping the private IP address of the BEs, the data loading fails. -- If you specify the value as exactly-once, `sink.buffer-flush.max-bytes`, `sink.buffer-flush.max-bytes`, and `sink.buffer-flush.interval-ms` are invalid. +- If you set `sink.semantic` to `exactly-once`, `sink.buffer-flush.max-bytes`, `sink.buffer-flush.max-bytes`, and `sink.buffer-flush.interval-ms` are invalid. ## Examples @@ -173,9 +175,10 @@ DISTRIBUTED BY HASH(`id`); #### Set up Flink environment -* Download Flink binary [Flink 1.15.2](https://archive.apache.org/dist/flink/flink-1.15.2/flink-1.15.2-bin-scala_2.12.tgz), and unzip it to directory `flink-1.15.2` -* Download [connector 1.2.7](https://repo1.maven.org/maven2/com/starrocks/flink-connector-starrocks/1.2.7_flink-1.15/flink-connector-starrocks-1.2.7_flink-1.15.jar), and put it into the directory `flink-1.15.2/lib` -* Run the following commands to start a Flink cluster +- Download Flink binary [Flink 1.15.2](https://archive.apache.org/dist/flink/flink-1.15.2/flink-1.15.2-bin-scala_2.12.tgz), and unzip it to directory `flink-1.15.2`. +- Download [Flink connector 1.2.7](https://repo1.maven.org/maven2/com/starrocks/flink-connector-starrocks/1.2.7_flink-1.15/flink-connector-starrocks-1.2.7_flink-1.15.jar), and put it into the directory `flink-1.15.2/lib`. +- Run the following commands to start a Flink cluster: + ```shell cd flink-1.15.2 ./bin/start-cluster.sh @@ -183,12 +186,14 @@ DISTRIBUTED BY HASH(`id`); ### Run with Flink SQL -* Run the following command to start a Flink SQL client. +- Run the following command to start a Flink SQL client. + ```shell ./bin/sql-client.sh ``` -* Create a Flink table `score_board`, and insert values into the table via Flink SQL Client. -Note you must define the primary key in the Flink DDL if load to a StarRocks primary key table. It's optional for other types of StarRocks table. + +- Create a Flink table `score_board`, and insert values into the table via Flink SQL Client. +Note you must define the primary key in the Flink DDL if you want to load data into a Primary Key table of StarRocks. It's optional for other types of StarRocks tables. ```SQL CREATE TABLE `score_board` ( @@ -201,6 +206,7 @@ Note you must define the primary key in the Flink DDL if load to a StarRocks pri 'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030', 'load-url' = '127.0.0.1:8030', 'database-name' = 'test', + 'table-name' = 'score_board', 'username' = 'root', 'password' = '' @@ -211,22 +217,28 @@ Note you must define the primary key in the Flink DDL if load to a StarRocks pri ### Run with Flink DataStream -There are several ways to implement a Flink DataStream job according to the type of the input records, such as a csv Java `String`, a json Java `String` or a custom Java object. +There are several ways to implement a Flink DataStream job according to the type of the input records, such as a CSV Java `String`, a JSON Java `String` or a custom Java object. + +- The input records are CSV-format `String`. See [LoadCsvRecords](https://github.com/StarRocks/starrocks-connector-for-apache-flink/tree/main/examples/src/main/java/com/starrocks/connector/flink/examples/datastream/LoadCsvRecords.java) for a complete example. -* The input records are csv-format `String`. See [LoadCsvRecords](https://github.com/StarRocks/starrocks-connector-for-apache-flink/tree/main/examples/src/main/java/com/starrocks/connector/flink/examples/datastream/LoadCsvRecords.java) for a complete example. ```java - // Generate csv-format records. Each record has three fields separated by "\t". These - // fields correspond to the columns `id`, `name`, and `score` in StarRocks table. + /** + * Generate CSV-format records. Each record has three values separated by "\t". + * These values will be loaded to the columns `id`, `name`, and `score` in the StarRocks table. + */ String[] records = new String[]{ "1\tstarrocks-csv\t100", "2\tflink-csv\t100" }; DataStream source = env.fromElements(records); - // Configure the connector with the required properties, and you also need to add properties - // "sink.properties.format" and "sink.properties.column_separator" to tell the connector the - // input records are csv-format, and the separator is "\t". You can also use other separators - // in the records, but remember to modify the "sink.properties.column_separator" correspondingly + /** + * Configure the Flink connector with the required properties. + * You also need to add properties "sink.properties.format" and "sink.properties.column_separator" + * to tell the Flink connector the input records are CSV-format, and the column separator is "\t". + * You can also use other column separators in the CSV-format records, + * but remember to modify the "sink.properties.column_separator" correspondingly. + */ StarRocksSinkOptions options = StarRocksSinkOptions.builder() .withProperty("jdbc-url", jdbcUrl) .withProperty("load-url", loadUrl) @@ -237,24 +249,29 @@ There are several ways to implement a Flink DataStream job according to the type .withProperty("sink.properties.format", "csv") .withProperty("sink.properties.column_separator", "\t") .build(); - // Create the sink with the options + // Create the sink with the options. SinkFunction starRockSink = StarRocksSink.sink(options); source.addSink(starRockSink); ``` -* The input records are json-format `String`. See [LoadJsonRecords](https://github.com/StarRocks/starrocks-connector-for-apache-flink/tree/main/examples/src/main/java/com/starrocks/connector/flink/examples/datastream/LoadJsonRecords.java) for a complete example. +- The input records are JSON-format `String`. See [LoadJsonRecords](https://github.com/StarRocks/starrocks-connector-for-apache-flink/tree/main/examples/src/main/java/com/starrocks/connector/flink/examples/datastream/LoadJsonRecords.java) for a complete example. + ```java - // Generate json-format records. Each record has three fields correspond to - // the columns `id`, `name`, and `score` in StarRocks table. + /** + * Generate JSON-format records. + * Each record has three key-value pairs corresponding to the columns `id`, `name`, and `score` in the StarRocks table. + */ String[] records = new String[]{ "{\"id\":1, \"name\":\"starrocks-json\", \"score\":100}", "{\"id\":2, \"name\":\"flink-json\", \"score\":100}", }; DataStream source = env.fromElements(records); - // Configure the connector with the required properties, and you also need to add properties - // "sink.properties.format" and "sink.properties.strip_outer_array" to tell the connector the - // input records are json-format. + /** + * Configure the Flink connector with the required properties. + * You also need to add properties "sink.properties.format" and "sink.properties.strip_outer_array" + * to tell the Flink connector the input records are JSON-format and to strip the outermost array structure. + */ StarRocksSinkOptions options = StarRocksSinkOptions.builder() .withProperty("jdbc-url", jdbcUrl) .withProperty("load-url", loadUrl) @@ -265,14 +282,14 @@ There are several ways to implement a Flink DataStream job according to the type .withProperty("sink.properties.format", "json") .withProperty("sink.properties.strip_outer_array", "true") .build(); - // Create the sink with the options + // Create the sink with the options. SinkFunction starRockSink = StarRocksSink.sink(options); source.addSink(starRockSink); ``` -* The input records are custom Java objects. See [LoadCustomJavaRecords](https://github.com/StarRocks/starrocks-connector-for-apache-flink/tree/main/examples/src/main/java/com/starrocks/connector/flink/examples/datastream/LoadCustomJavaRecords.java) for a complete example. +- The input records are custom Java objects. See [LoadCustomJavaRecords](https://github.com/StarRocks/starrocks-connector-for-apache-flink/tree/main/examples/src/main/java/com/starrocks/connector/flink/examples/datastream/LoadCustomJavaRecords.java) for a complete example. - * In this example, the input record is a simple POJO `RowData`. + - In this example, the input record is a simple POJO `RowData`. ```java public static class RowData { @@ -290,7 +307,8 @@ There are several ways to implement a Flink DataStream job according to the type } ``` - * The main program is + - The main program is as follows: + ```java // Generate records which use RowData as the container. RowData[] records = new RowData[]{ @@ -299,7 +317,7 @@ There are several ways to implement a Flink DataStream job according to the type }; DataStream source = env.fromElements(records); - // Configure the connector with the required properties + // Configure the Flink connector with the required properties. StarRocksSinkOptions options = StarRocksSinkOptions.builder() .withProperty("jdbc-url", jdbcUrl) .withProperty("load-url", loadUrl) @@ -309,49 +327,50 @@ There are several ways to implement a Flink DataStream job according to the type .withProperty("password", "") .build(); - // connector will use a Java object array (Object[]) to represent a row of - // StarRocks table, and each element is the value for a column. Need to - // define the schema of the Object[] which matches that of StarRocks table + /** + * The connector will use a Java object array (Object[]) to represent a row to be loaded into the StarRocks table, + * and each element is the value for a column. + * You need to define the schema of the Object[] which matches that of the StarRocks table. + */ TableSchema schema = TableSchema.builder() .field("id", DataTypes.INT().notNull()) .field("name", DataTypes.STRING()) .field("score", DataTypes.INT()) - // Must specify the primary key for StarRocks primary key table, - // and DataTypes.INT().notNull() for `id` must specify notNull() + // When the StarRocks table is a Primary Key table, you must specify notNull(), for example, DataTypes.INT().notNull(), for the primary key `id`. .primaryKey("id") .build(); - // Transforms the RowData to the Object[] according to the schema + // Transform the RowData to the Object[] according to the schema. RowDataTransformer transformer = new RowDataTransformer(); - // Create the sink with schema, options, and transformer + // Create the sink with the schema, options, and transformer. SinkFunction starRockSink = StarRocksSink.sink(schema, options, transformer); source.addSink(starRockSink); ``` - * The `RowDataTransformer` in the main program is defined as + - The `RowDataTransformer` in the main program is defined as follows: + ```java private static class RowDataTransformer implements StarRocksSinkRowBuilder { /** * Set each element of the object array according to the input RowData. - * The schema of the array matches that of StarRocks table. + * The schema of the array matches that of the StarRocks table. */ @Override public void accept(Object[] internalRow, RowData rowData) { internalRow[0] = rowData.id; internalRow[1] = rowData.name; internalRow[2] = rowData.score; - // Only need for StarRocks primary key table. Set the last - // element to tell whether the record is a UPSERT or DELETE. + // When the StarRocks table is a Primary Key table, you need to set the last element to indicate whether the data loading is an UPSERT or DELETE operation. internalRow[internalRow.length - 1] = StarRocksSinkOP.UPSERT.ordinal(); } } ``` -## Best Practices +## Best practices -### Load data to primary key table +### Load data to a Primary Key table -This section will show how to load data to StarRocks primary key table to achieve partial update, and conditional update. +This section will show how to load data to a StarRocks Primary Key table to achieve partial updates and conditional updates. You can see [Change data through loading](https://docs.starrocks.io/en-us/latest/loading/Load_to_Primary_Key_tables) for the introduction of those features. These examples use Flink SQL. @@ -378,116 +397,123 @@ DISTRIBUTED BY HASH(`id`); This example will show how to load data only to columns `id` and `name`. -1. Insert initial data to StarRocks table in MySQL client -```SQL -mysql> INSERT INTO `score_board` VALUES (1, 'starrocks', 100), (2, 'flink', 100); - -mysql> select * from score_board; -+------+-----------+-------+ -| id | name | score | -+------+-----------+-------+ -| 1 | starrocks | 100 | -| 2 | flink | 100 | -+------+-----------+-------+ -2 rows in set (0.02 sec) -``` +1. Insert two data rows into the StarRocks table `score_board` in MySQL client. + + ```SQL + mysql> INSERT INTO `score_board` VALUES (1, 'starrocks', 100), (2, 'flink', 100); + + mysql> select * from score_board; + +------+-----------+-------+ + | id | name | score | + +------+-----------+-------+ + | 1 | starrocks | 100 | + | 2 | flink | 100 | + +------+-----------+-------+ + 2 rows in set (0.02 sec) + ``` -2. Create a Flink table `score_board` in Flink SQL client - * Define the DDL which only includes columns `id` and `name` - * Set the option `sink.properties.partial_update` to `true` which tells the connector to do partial update - * If the connector version <= 1.2.7, also need to set the option `sink.properties.columns` to `id,name,__op` - which tells the connector the columns to update. Note you need append `__op` at the end, and this field is - used to specify UPSERT/DELETE operation, but its value is set by the connector automatically. - - ```SQL - CREATE TABLE `score_board` ( - `id` INT, - `name` STRING, - PRIMARY KEY (id) NOT ENFORCED - ) WITH ( - 'connector' = 'starrocks', - 'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030', - 'load-url' = '127.0.0.1:8030', - 'database-name' = 'test', - 'table-name' = 'score_board', - 'username' = 'root', - 'password' = '', - 'sink.properties.partial_update' = 'true', - -- only need for connector version <= 1.2.7 - 'sink.properties.columns' = 'id,name,__op' - ); - ``` -3. Insert data to the table in Flink SQL client, and only update the column `name` - ```SQL - INSERT INTO `score_board` VALUES (1, 'starrocks-update'), (2, 'flink-update'); - ``` -4. Query the StarRocks table in mysql client - You can see that only values for `name` changes, and the values for `score` does not change. +2. Create a Flink table `score_board` in Flink SQL client. + + - Define the DDL which only includes the columns `id` and `name`. + - Set the option `sink.properties.partial_update` to `true` which tells the Flink connector to perform partial updates. + - If the Flink connector version <= 1.2.7, you also need to set the option `sink.properties.columns` to `id,name,__op` to tells the Flink connector which columns need to be updated. Note that you need to append the field `__op` at the end. The field `__op` indicates that the data loading is an UPSERT or DELETE operation, and its values are set by the Flink connector automatically. + + ```SQL + CREATE TABLE `score_board` ( + `id` INT, + `name` STRING, + PRIMARY KEY (id) NOT ENFORCED + ) WITH ( + 'connector' = 'starrocks', + 'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030', + 'load-url' = '127.0.0.1:8030', + 'database-name' = 'test', + 'table-name' = 'score_board', + 'username' = 'root', + 'password' = '', + 'sink.properties.partial_update' = 'true', + -- only for Flink connector version <= 1.2.7 + 'sink.properties.columns' = 'id,name,__op' + ); + ``` + +3. Insert two data rows into the Flink table. The primary keys of the data rows are as same as these of rows in the StarRocks table. but the values in the column `name` are modified. + + ```SQL + INSERT INTO `score_board` VALUES (1, 'starrocks-update'), (2, 'flink-update'); + ``` + +4. Query the StarRocks table in MySQL client. - ```SQL - mysql> select * from score_board; - +------+------------------+-------+ - | id | name | score | - +------+------------------+-------+ - | 1 | starrocks-update | 100 | - | 2 | flink-update | 100 | - +------+------------------+-------+ - 2 rows in set (0.02 sec) - ``` + ```SQL + mysql> select * from score_board; + +------+------------------+-------+ + | id | name | score | + +------+------------------+-------+ + | 1 | starrocks-update | 100 | + | 2 | flink-update | 100 | + +------+------------------+-------+ + 2 rows in set (0.02 sec) + ``` + + You can see that only values for `name` change, and the values for `score` do not change. #### Conditional update This example will show how to do conditional update according to the value of column `score`. The update for an `id` takes effect only when the new value for `score` is has a greater or equal to the old value. -1. Insert initial data to StarRocks table in MySQL client - ```SQL - mysql> INSERT INTO `score_board` VALUES (1, 'starrocks', 100), (2, 'flink', 100); +1. Insert two data rows into the StarRocks table in MySQL client. + + ```SQL + mysql> INSERT INTO `score_board` VALUES (1, 'starrocks', 100), (2, 'flink', 100); + + mysql> select * from score_board; + +------+-----------+-------+ + | id | name | score | + +------+-----------+-------+ + | 1 | starrocks | 100 | + | 2 | flink | 100 | + +------+-----------+-------+ + 2 rows in set (0.02 sec) + ``` + +2. Create a Flink table `score_board` in the following ways: - mysql> select * from score_board; - +------+-----------+-------+ - | id | name | score | - +------+-----------+-------+ - | 1 | starrocks | 100 | - | 2 | flink | 100 | - +------+-----------+-------+ - 2 rows in set (0.02 sec) - ``` - -2. Create a Flink table `score_board` in the following ways - * Define the DDL including all of columns - * Set the option `sink.properties.merge_condition` to `score` which tells the connector to use the column `score` - as the condition - * Set the option `sink.version` to `V1` which tells the connector to use stream load - - ```SQL - CREATE TABLE `score_board` ( - `id` INT, - `name` STRING, - `score` INT, - PRIMARY KEY (id) NOT ENFORCED - ) WITH ( - 'connector' = 'starrocks', - 'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030', - 'load-url' = '127.0.0.1:8030', - 'database-name' = 'test', - 'table-name' = 'score_board', - 'username' = 'root', - 'password' = '', - 'sink.properties.merge_condition' = 'score', - 'sink.version' = 'V1' - ); - ``` - -3. Insert data to the table in Flink SQL client, and update id 1 with a smaller score, and id 2 with a larger score - ```SQL - INSERT INTO `score_board` VALUES (1, 'starrocks-update', 99), (2, 'flink-update', 101); - ``` - -4. Query the StarRocks table in mysql client - You can see that only the row for id 2 changes, and the row for id 1 does not change. - ```SQL - mysql> select * from score_board; + - Define the DDL including all of columns. + - Set the option `sink.properties.merge_condition` to `score` to tell the Flink connector to use the column `score` + as the condition. + - Set the option `sink.version` to `V1` to tell the connector to use Stream Load interface. + + ```SQL + CREATE TABLE `score_board` ( + `id` INT, + `name` STRING, + `score` INT, + PRIMARY KEY (id) NOT ENFORCED + ) WITH ( + 'connector' = 'starrocks', + 'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030', + 'load-url' = '127.0.0.1:8030', + 'database-name' = 'test', + 'table-name' = 'score_board', + 'username' = 'root', + 'password' = '', + 'sink.properties.merge_condition' = 'score', + 'sink.version' = 'V1' + ); + ``` + +3. Insert two data rows into the Flink table. The primary keys of the data rows are as same as these of rows in the StarRocks table. The first data row has a smaller value in the column `score`, and the second data row has a larger value in the column `score`. + + ```SQL + INSERT INTO `score_board` VALUES (1, 'starrocks-update', 99), (2, 'flink-update', 101); + ``` + +4. Query the StarRocks table in MySQL client. + + ```SQL + mysql> select * from score_board; +------+--------------+-------+ | id | name | score | +------+--------------+-------+ @@ -495,14 +521,16 @@ takes effect only when the new value for `score` is has a greater or equal to th | 2 | flink-update | 101 | +------+--------------+-------+ 2 rows in set (0.03 sec) - ``` + ``` + + You can see that only the values of the second data row change, and the values of the first data row do not change. ### Load data into columns of BITMAP type [`BITMAP`](https://docs.starrocks.io/en-us/latest/sql-reference/sql-statements/data-types/BITMAP) is often used to accelerate count distinct, such as counting UV, see [Use Bitmap for exact Count Distinct](https://docs.starrocks.io/en-us/latest/using_starrocks/Using_bitmap). Here we take the counting of UV as an example to show how to load data into columns of the `BITMAP` type. -1. Create a StarRocks Aggregate table +1. Create a StarRocks Aggregate table in MySQL client. In the database `test`, create an Aggregate table `page_uv` where the column `visit_users` is defined as the `BITMAP` type and configured with the aggregate function `BITMAP_UNION`. @@ -516,13 +544,12 @@ Here we take the counting of UV as an example to show how to load data into colu DISTRIBUTED BY HASH(`page_id`); ``` -2. Create a Flink table in Flink SQL client - - `visit_user_id` is `BIGINT` in Flink, and we want to load it to the column `visit_users` of StarRocks table. Note that for the Flink DDL - * define the `visit_user_id` instead of `visit_users` because Flink does not support `BITMAP` - * set the option `sink.properties.columns` to `page_id,visit_date,user_id,visit_users=to_bitmap(visit_user_id)` which tells the connector the column mapping - between Flink table and StarRocks table. It uses the [`to_bitmap`](https://docs.starrocks.io/en-us/latest/sql-reference/sql-functions/bitmap-functions/to_bitmap) - function to convert the data of `BIGINT` type into `BITMAP` type. +2. Create a Flink table in Flink SQL client. + + The column `visit_user_id` in the Flink table is of `BIGINT` type, and we want to load this column to the column `visit_users` of `BITMAP` type in the StarRocks table. So when defining the DDL of the Flink table, note that: + - Because Flink does not support `BITMAP`, you need to define a column `visit_user_id` as `BIGINT` type to represent the column `visit_users` of `BITMAP` type in the StarRocks table. + - You need to set the option `sink.properties.columns` to `page_id,visit_date,user_id,visit_users=to_bitmap(visit_user_id)`, which tells the Flink connector the column mapping beween the Flink table and StarRocks table. Also you need to use [`to_bitmap`](https://docs.starrocks.io/en-us/latest/sql-reference/sql-functions/bitmap-functions/to_bitmap) + function to tell the Flink connector to convert the data of `BIGINT` type into `BITMAP` type. ```SQL CREATE TABLE `page_uv` ( @@ -531,8 +558,8 @@ Here we take the counting of UV as an example to show how to load data into colu `visit_user_id` BIGINT ) WITH ( 'connector' = 'starrocks', - 'jdbc-url' = 'jdbc:mysql://127.0.0.1:11903', - 'load-url' = '127.0.0.1:11901', + 'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030', + 'load-url' = '127.0.0.1:8030', 'database-name' = 'test', 'table-name' = 'page_uv', 'username' = 'root', @@ -541,7 +568,7 @@ Here we take the counting of UV as an example to show how to load data into colu ); ``` -3. Load data into Flink table in Flink SQL client +3. Load data into Flink table in Flink SQL client. ```SQL INSERT INTO `page_uv` VALUES @@ -585,13 +612,11 @@ Here we take the counting of UV as an example to show how to load data into colu DISTRIBUTED BY HASH(`page_id`); ``` -2. Create a Flink table in Flink SQL client +2. Create a Flink table in Flink SQL client. - `visit_user_id` is `BIGINT` in Flink, and we want to load it to the column `visit_users` of StarRocks table. Note that for the Flink DDL - * define the `visit_user_id` instead of `visit_users` because Flink does not support `BITMAP` - * set the option `sink.properties.columns` to `page_id,visit_date,user_id,visit_users=hll_hash(visit_user_id)` which tells the connector the column mapping - between Flink table and StarRocks table. It uses the [`hll_hash`](https://docs.starrocks.io/en-us/latest/sql-reference/sql-functions/aggregate-functions/hll_hash) - function to convert the data of `BIGINT` type into `HLL` type. + The column `visit_user_id` in the Flink table is of `BIGINT` type, and we want to load this column to the column `visit_users` of `HLL` type in the StarRocks table. So when defining the DDL of the Flink table, note that: + - Because Flink does not support `BITMAP`, you need to define a column `visit_user_id` as `BIGINT` type to represent the column `visit_users` of `HLL` type in the StarRocks table. + - You need to set the option `sink.properties.columns` to `page_id,visit_date,user_id,visit_users=hll_hash(visit_user_id)` which tells the Flink connector the column mapping between Flink table and StarRocks table. Also you need to use [`hll_hash`](https://docs.starrocks.io/en-us/latest/sql-reference/sql-functions/aggregate-functions/hll_hash) function to tell the Flink connector to convert the data of `BIGINT` type into `HLL` type. ```SQL CREATE TABLE `hll_uv` ( @@ -610,7 +635,7 @@ Here we take the counting of UV as an example to show how to load data into colu ); ``` -3. Load data into Flink table in Flink SQL client +3. Load data into Flink table in Flink SQL client. ```SQL INSERT INTO `hll_uv` VALUES