This repository supports related articles and publications around using Apache Flink and CrateDB, and is part of the Building industrial IoT applications with open-source components and CrateDB reference architecture series.
It is supplemented by a corresponding executable end-to-end tutorial for Apache Kafka, Apache Flink, and CrateDB, which easily provides a Kafka/Flink infrastructure on your workstation based on Docker Compose, pulls the Flink job JAR file from the assets on the release page, and submits it to the Flink cluster.
- Article: Build a data ingestion pipeline using Kafka, Flink, and CrateDB
- Forum: CrateDB Community Day #2
- Recording: An introduction to Apache Flink and Flink SQL by Timo Walther, Principal Software Engineer at Apache Flink
- Recording: Live demo of Apache Flink with CrateDB as source or sink by Marios Trivyzas, Senior Software Engineer at CrateDB
For learning more details about the technologies used here, please follow up reading the excellent documentation and resources around Apache Flink.
- Apache Flink Studies
- Apache Flink SQL Cookbook
- Flink » Examples » Batch
- Flink » Examples » DataStream
- Flink » Examples » Table
Most of the Flink jobs demonstrated here connect to CrateDB using the Flink JDBC Connector, using both the vanilla PostgreSQL JDBC driver, and the CrateDB adapter/dialect.
The first two jobs, both defined in io.crate.flink.demo
, can be launched
as standalone Java applications, without the need to submit them to a Flink
cluster. The other job, defined in io.crate.streaming
, is meant to be
submitted as a job to a Flink cluster for demonstration purposes, but can
also be invoked interactively.
The
SimpleJdbcSinkJob
demonstrates a basic example which inserts a few records into CrateDB using Flink. It outlines how to use the fundamental FlinkJdbcSink
API, and how to adjust the correspondingJdbcExecutionOptions
andJdbcConnectionOptions
.The
SimpleTableApiJob
demonstrates how to use the Flink Table API and the Flink DataStream API.The Flink Table API is a language-integrated query API for Java, Scala, and Python that allows the composition of queries from relational operators such as selection, filter, and join in a very intuitive way, and offers a unified interface for both stream and batch processing.
The Flink DataStream API offers the primitives of stream processing (namely time, state, and dataflow management) in a relatively low-level imperative programming API. The Table API abstracts away many internals and provides a structured and declarative API.
Both APIs can work with bounded and unbounded streams. Bounded streams need to be managed when processing historical data. Unbounded streams occur in real-time processing scenarios that might be initialized with historical data first.
The
TaxiRidesStreamingJob
subscribes to an Apache Kafka topic as a data source, and stores received data into CrateDB as a data sink. The data stream is represented by records from the venerable NYC Yellow Taxi Trips dataset.
Acquire and build the source code.
git clone https://github.com/crate/cratedb-flink-jobs
cd cratedb-flink-jobs
make build
make test
will probe the job usingflink info
.make submit
will submit the job usingflink run
to a Flink cluster atlocalhost:8081
.
make run JOB=SimpleJdbcSinkJob
make run JOB=SimpleTableApiJob
SimpleJdbcSinkJob
uses the PostgreSQL JDBC driverDriver class:org.postgresql.Driver
URL schema:postgresql://
- Driver class:
io.crate.client.jdbc.CrateDriver
URL schema:crate://
These are the settings for the TaxiRidesStreamingJob
.
Setting | Description |
---|---|
kafka.servers | Comma-separated list of Kafka brokers to connect to. |
kafka.topic | Kafka topic to consume. |
crate.hosts | Comma-separated list of CrateDB hosts. The format is
<hostname>:<psql_port> [, ...] .Example:
crate-01.example.net:5432,crate-02.example.net:5432 |
crate.table | CrateDB table name. |
Setting | Default | Description |
---|---|---|
kafka.group.id | default | Kafka consumer group ID. |
kafka.offset | earliest | Kafka topic offset. |
batch.interval.ms | 5000 | Timeout in milliseconds to use for periodic flushing. |
crate.schema | doc | CrateDB schema. |
crate.user | crate | CrateDB user. |
crate.password | <empty> | CrateDB user password. |