Skip to content

Commit

Permalink
[Tool] Add examples for sink with DataStream api
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy committed Aug 10, 2023
1 parent f84a298 commit 9b6480d
Show file tree
Hide file tree
Showing 5 changed files with 440 additions and 0 deletions.
94 changes: 94 additions & 0 deletions examples/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
<?xml version="1.0" encoding="UTF-8"?>

<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-examples</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<file_encoding>UTF-8</file_encoding>
<flink.minor.version>1.15</flink.minor.version>
<flink.version>1.15.0</flink.version>
<scala.version>2.12</scala.version>
<connector.version>1.2.7</connector.version>
<log4j.version>2.17.1</log4j.version>
</properties>

<dependencies>
<!-- connector dependency for Flink version >= 1.15 -->
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<version>${connector.version}_flink-${flink.minor.version}</version>
</dependency>

<!-- connector dependency for Flink version <= 1.14 -->
<!-- <dependency>-->
<!-- <groupId>com.starrocks</groupId>-->
<!-- <artifactId>flink-connector-starrocks</artifactId>-->
<!-- <version>${connector.version}_flink-${flink.version}_${scala.version}</version>-->
<!-- <scope>provided</scope>-->
<!-- </dependency>-->

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>

</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright 2021-present StarRocks, Inc. All rights reserved.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.starrocks.connector.flink.examples.datastream;

import com.starrocks.connector.flink.StarRocksSink;
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

/**
* This example will show how to load records to StarRocks table using Flink DataStream.
* Each record is a csv {@link String} in Flink, and will be loaded as a row of StarRocks table.
*/
public class LoadCsvRecords {

public static void main(String[] args) throws Exception {
// To run the example, you should prepare in the following steps
// 1. create a primary key table in your StarRocks cluster. The DDL is
// CREATE DATABASE `test`;
// CREATE TABLE `test`.`score_board`
// (
// `id` int(11) NOT NULL COMMENT "",
// `name` varchar(65533) NULL DEFAULT "" COMMENT "",
// `score` int(11) NOT NULL DEFAULT "0" COMMENT ""
// )
// ENGINE=OLAP
// PRIMARY KEY(`id`)
// COMMENT "OLAP"
// DISTRIBUTED BY HASH(`id`)
// PROPERTIES(
// "replication_num" = "1"
// );
//
// 2. replace the connector options "jdbc-url" and "load-url" with your cluster configurations
MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
String jdbcUrl = params.get("jdbcUrl", "jdbc:mysql://127.0.0.1:9030");
String loadUrl = params.get("loadUrl", "127.0.0.1:8030");

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Generate csv-format records. Each record has three fields separated by "\t". These
// fields correspond to the columns `id`, `name`, and `score` in StarRocks table.
String[] records = new String[]{
"1\tstarrocks-csv\t100",
"2\tflink-csv\t100"
};
DataStream<String> source = env.fromElements(records);

// Configure the connector with the required properties, and you also need to add properties
// "sink.properties.format" and "sink.properties.column_separator" to tell the connector the
// input records are csv-format, and the separator is "\t". You can also use other separators
// in the records, but remember to modify the "sink.properties.column_separator" correspondingly
StarRocksSinkOptions options = StarRocksSinkOptions.builder()
.withProperty("jdbc-url", jdbcUrl)
.withProperty("load-url", loadUrl)
.withProperty("database-name", "test")
.withProperty("table-name", "score_board")
.withProperty("username", "root")
.withProperty("password", "")
.withProperty("sink.properties.format", "csv")
.withProperty("sink.properties.column_separator", "\t")
.build();
// Create the sink with the options
SinkFunction<String> starRockSink = StarRocksSink.sink(options);
source.addSink(starRockSink);

env.execute("LoadCsvRecords");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Copyright 2021-present StarRocks, Inc. All rights reserved.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.starrocks.connector.flink.examples.datastream;

import com.starrocks.connector.flink.StarRocksSink;
import com.starrocks.connector.flink.row.sink.StarRocksSinkOP;
import com.starrocks.connector.flink.row.sink.StarRocksSinkRowBuilder;
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;

/**
* This example will show how to load records to StarRocks table using Flink DataStream.
* Each record is a user-defined java object {@link RowData} in Flink, and will be loaded
* as a row of StarRocks table.
*/
public class LoadCustomJavaRecords {

public static void main(String[] args) throws Exception {
// To run the example, you should prepare in the following steps
// 1. create a primary key table in your StarRocks cluster. The DDL is
// CREATE DATABASE `test`;
// CREATE TABLE `test`.`score_board`
// (
// `id` int(11) NOT NULL COMMENT "",
// `name` varchar(65533) NULL DEFAULT "" COMMENT "",
// `score` int(11) NOT NULL DEFAULT "0" COMMENT ""
// )
// ENGINE=OLAP
// PRIMARY KEY(`id`)
// COMMENT "OLAP"
// DISTRIBUTED BY HASH(`id`)
// PROPERTIES(
// "replication_num" = "1"
// );
//
// 2. replace the connector options "jdbc-url" and "load-url" with your cluster configurations
MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
String jdbcUrl = params.get("jdbcUrl", "jdbc:mysql://127.0.0.1:9030");
String loadUrl = params.get("loadUrl", "127.0.0.1:8030");

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Generate records which use RowData as the container.
RowData[] records = new RowData[]{
new RowData(1, "starrocks-rowdata", 100),
new RowData(2, "flink-rowdata", 100),
};
DataStream<RowData> source = env.fromElements(records);

// Configure the connector with the required properties
StarRocksSinkOptions options = StarRocksSinkOptions.builder()
.withProperty("jdbc-url", jdbcUrl)
.withProperty("load-url", loadUrl)
.withProperty("database-name", "test")
.withProperty("table-name", "score_board")
.withProperty("username", "root")
.withProperty("password", "")
.build();

// connector will use a Java object array (Object[]) to represent a row of
// StarRocks table, and each element is the value for a column. Need to define
// the schema of the Object[] which matches that of StarRocks table
TableSchema schema = TableSchema.builder()
.field("id", DataTypes.INT().notNull())
.field("name", DataTypes.STRING())
.field("score", DataTypes.INT())
// Must specify the primary key for StarRocks primary key table,
// and DataTypes.INT().notNull() for `id` must specify notNull()
.primaryKey("id")
.build();
// Transforms the RowData to the Object[] according to the schema
RowDataTransformer transformer = new RowDataTransformer();
// Create the sink with schema, options, and transformer
SinkFunction<RowData> starRockSink = StarRocksSink.sink(schema, options, transformer);
source.addSink(starRockSink);

env.execute("LoadCustomJavaRecords");
}

/**
* A simple POJO which includes three fields: id, name and core,
* which match the schema of the StarRocks table `score_board`.
*/
public static class RowData {
public int id;
public String name;
public int score;

public RowData() {}

public RowData(int id, String name, int score) {
this.id = id;
this.name = name;
this.score = score;
}
}

/**
* Transforms the input {@link RowData} to an Object[] which is the internal
* representation for a row in StarRocks table.
*/
private static class RowDataTransformer implements StarRocksSinkRowBuilder<RowData> {

/**
* Set each element of the object array according to the input RowData.
* The schema of the array matches that of StarRocks table.
*/
@Override
public void accept(Object[] internalRow, RowData rowData) {
internalRow[0] = rowData.id;
internalRow[1] = rowData.name;
internalRow[2] = rowData.score;
// Only need for StarRocks primary key table. Set the last
// element to tell whether the record is a UPSERT or DELETE.
internalRow[internalRow.length - 1] = StarRocksSinkOP.UPSERT.ordinal();
}
}

}
Loading

0 comments on commit 9b6480d

Please sign in to comment.