Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add example for using Flink File cache #80

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions java/FileCache/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Flink File Cache usage example

* Flink version: 1.20
* Flink API: DataStream API
* Language: Java (11)
* Flink connectors: DataGeneratorSource, DiscardingSink


This example demonstrate how to use Flink's distributed cache to copy files over from JobManager and distribute them to the TaskManager worker nodes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to explain clearly when this is useful, because this example doesn't show using the certificate. It just copies it.
In particular, we are showing downloading a cacert and not just random file. We need to explain when is useful


In this example, we download a `cacerts` file for custom trust store from S3, and configure the TaskManager JVM to use this new trust store location.

We could also download directly from S3 to the taskmanager locations. However, using the filecache is a more Flink-native way to do this, and will ensure that the file is available even on job restarts.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The example actually downloads the file from S3, in the main().
Why is this different from downloading it from S3 in the open() of the RichMapFunction, downloading into each TM?


### Running in IntelliJ

You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation.

See [Running examples locally](../running-examples-locally.md) for details.
129 changes: 129 additions & 0 deletions java/FileCache/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.amazonaws</groupId>
<artifactId>file-cache</artifactId>
<version>1.0</version>
<packaging>jar</packaging>

<properties>
<jar.finalName>${project.name}-${project.version}</jar.finalName>
<target.java.version>11</target.java.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<flink.version>1.20.0</flink.version>
<log4j.version>2.23.1</log4j.version>
</properties>

<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!-- Add S3 dependency -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<version>2.25.27</version>
</dependency>

<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
</dependencies>

<build>
<finalName>${jar.finalName}</finalName>

<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.amazonaws.services.msf.StreamingJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.amazonaws.services.msf;

import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichMapFunction;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;

import static com.amazonaws.services.msf.StreamingJob.DISTRIBUTED_CACHE_RESOURCE_NAME;

public class CopyingPassThroughFunction<T> extends RichMapFunction<T, T> {

private static final Logger LOG = LoggerFactory.getLogger(CopyingPassThroughFunction.class);


private static final String TASKMANAGER_LOCAL_FILE_LOCATION = "/tmp/cacerts";

@Override
public void open(OpenContext openContext) throws Exception {
// Perform setup steps here
// Copy file from distributed file cache to local directory on TaskManager if it doesn't already exist
// This file was registered on the JobManager as part of the "main()" function
File localFileDir = new File(TASKMANAGER_LOCAL_FILE_LOCATION);

// Skip the copy if the file already exists locally
if (!localFileDir.exists()) {
localFileDir.mkdirs();
File cachedFile = getRuntimeContext().getDistributedCache().getFile(DISTRIBUTED_CACHE_RESOURCE_NAME);
Files.copy(cachedFile.toPath(), localFileDir.toPath(), StandardCopyOption.REPLACE_EXISTING);
LOG.info("Copied over resource file {} to taskmanager location {}", DISTRIBUTED_CACHE_RESOURCE_NAME, TASKMANAGER_LOCAL_FILE_LOCATION);
} else {
LOG.info("Skipping resource copy for resource {} on taskmanager as the file already exists: {}", DISTRIBUTED_CACHE_RESOURCE_NAME, TASKMANAGER_LOCAL_FILE_LOCATION);
}
Comment on lines +29 to +37
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we come up with a utility class that encapsulates downloading of the necessary certificates? This way the initialisation logic could be a one liner that can be included by any operator that requires the files.

Ideally that should be an idempotent process that could be run multiple times without sideeffects.


// Perform any custom operations on the copied file here. For example, reconfiguring the trust store of the JVM
LOG.info("Trust store location before modification : " + System.getProperty("javax.net.ssl.trustStore"));
if (!TASKMANAGER_LOCAL_FILE_LOCATION.equals(System.getProperty("javax.net.ssl.trustStore"))) {
System.setProperty("javax.net.ssl.trustStore", TASKMANAGER_LOCAL_FILE_LOCATION);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this truststore is supposed to be used to initialize another operator, like a source or a sink.
How does this guarantee that the open() of this operator happens before the open() of operator that expects the certificate?

LOG.info("Trust store location after modification : " + System.getProperty("javax.net.ssl.trustStore"));
} else {
LOG.info("Trust store already pointing to right location. Skipping modification");
}
}


@Override
public T map(T t) throws Exception {
// simply pass through
return t;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package com.amazonaws.services.msf;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;

import java.io.File;


public class StreamingJob {
private static final Logger LOG = LoggerFactory.getLogger(StreamingJob.class);

private static final String JOBMANAGER_LOCAL_FILE_DIRECTORY = "/tmp/";

public static final String DISTRIBUTED_CACHE_RESOURCE_NAME = "cacerts-file";

private static DataGeneratorSource<Long> createDatagenSource() {
return new DataGeneratorSource<>(
i -> i,
Long.MAX_VALUE,
RateLimiterStrategy.perSecond(1),
TypeInformation.of(Long.class));
}

private static void downloadFileToDirectory(String s3BucketName, String s3FileKey, String targetDir) throws Exception {
File tmpLocalDirectoryLocation = new File(targetDir);
if (!tmpLocalDirectoryLocation.exists()) {
tmpLocalDirectoryLocation.mkdirs();
}

File targetFile = new File(targetDir + "/" + s3FileKey);

try (S3Client s3 = S3Client.create()) {
s3.getObject(GetObjectRequest.builder()
.bucket(s3BucketName)
.key(s3FileKey)
.build(), targetFile.toPath());
LOG.info("Copied S3 file s3://{}/{} to location {} ", s3BucketName, s3FileKey, targetFile);
}
}

public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Download file to temporary directory on the JobManager
String s3BucketName = "my-bucket-name";
String s3FileKey = "file-key";
downloadFileToDirectory(s3BucketName, s3FileKey, JOBMANAGER_LOCAL_FILE_DIRECTORY);
// Register file with Flink distributed cache so that it can be access from TaskManagers
env.registerCachedFile(JOBMANAGER_LOCAL_FILE_DIRECTORY + "/" + s3FileKey, DISTRIBUTED_CACHE_RESOURCE_NAME);


env.fromSource(createDatagenSource(),
WatermarkStrategy.noWatermarks(),
"Datagen source",
TypeInformation.of(Long.class))
// Add a pass-through function to retrieve the file from distributed cache and copy over to taskmanager local directory
.map(new CopyingPassThroughFunction<>())
.sinkTo(new DiscardingSink<>());

env.execute("File Cache usage example");
}
}
7 changes: 7 additions & 0 deletions java/FileCache/src/main/resources/log4j2.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender

appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
1 change: 1 addition & 0 deletions java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
<module>AvroGlueSchemaRegistryKafka</module>
<module>AvroGlueSchemaRegistryKinesis</module>
<module>CustomMetrics</module>
<module>FileCache</module>
<module>GettingStarted</module>
<module>GettingStartedTable</module>
<module>IcebergDatastreamSink</module>
Expand Down