Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add example for using Flink File cache #80

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

hlteoh37
Copy link
Contributor

@hlteoh37 hlteoh37 commented Nov 28, 2024

Purpose of the change

Add example for using Flink filecache

Verifying this change

Run the example on MSF:

  1. Set up S3 file with cacerts copied directly from JVM cacerts
  2. Run up the app and validate that the logs show the trust cert configured correctly.
  3. Checked the TM instances and verified that they have the file copied to /tmp/cacerts.

Verified that the S3 download happens successfully on the JM
SCR-20241128-owfp

Verified that the reconfiguring of the trust store occurs on TM successfully.
SCR-20241128-owcz

Verified that on the TM host, the cacerts file is added.

image

Significant changes

(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterward, for convenience.)

  • Completely new example
  • Updated an existing example to newer Flink version or dependencies versions
  • Improved an existing example
  • Modified the runtime configuration of an existing example (i.e. added/removed/modified any runtime properties)
  • Modified the expected input or output of an existing example (e.g. modified the source or sink, modified the record schema)

Copy link
Contributor

@nicusX nicusX left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This example has a problem: it shows a hack without explaining when it's useful and how to use it. This is going to be very confusing for a user.
This was a problem of the old example repos: showing hacks without real explanations.

We set a truststore but we do not show how it is used.

Also, is it guaranteed that javax.net.ssl.trustStore points to the downloaded truststore when another operator is going to use it?

* Flink connectors: DataGeneratorSource, DiscardingSink


This example demonstrate how to use Flink's distributed cache to copy files over from JobManager and distribute them to the TaskManager worker nodes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to explain clearly when this is useful, because this example doesn't show using the certificate. It just copies it.
In particular, we are showing downloading a cacert and not just random file. We need to explain when is useful


In this example, we download a `cacerts` file for custom trust store from S3, and configure the TaskManager JVM to use this new trust store location.

We could also download directly from S3 to the taskmanager locations. However, using the filecache is a more Flink-native way to do this, and will ensure that the file is available even on job restarts.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The example actually downloads the file from S3, in the main().
Why is this different from downloading it from S3 in the open() of the RichMapFunction, downloading into each TM?

// Perform any custom operations on the copied file here. For example, reconfiguring the trust store of the JVM
LOG.info("Trust store location before modification : " + System.getProperty("javax.net.ssl.trustStore"));
if (!TASKMANAGER_LOCAL_FILE_LOCATION.equals(System.getProperty("javax.net.ssl.trustStore"))) {
System.setProperty("javax.net.ssl.trustStore", TASKMANAGER_LOCAL_FILE_LOCATION);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this truststore is supposed to be used to initialize another operator, like a source or a sink.
How does this guarantee that the open() of this operator happens before the open() of operator that expects the certificate?

Comment on lines +29 to +37
// Skip the copy if the file already exists locally
if (!localFileDir.exists()) {
localFileDir.mkdirs();
File cachedFile = getRuntimeContext().getDistributedCache().getFile(DISTRIBUTED_CACHE_RESOURCE_NAME);
Files.copy(cachedFile.toPath(), localFileDir.toPath(), StandardCopyOption.REPLACE_EXISTING);
LOG.info("Copied over resource file {} to taskmanager location {}", DISTRIBUTED_CACHE_RESOURCE_NAME, TASKMANAGER_LOCAL_FILE_LOCATION);
} else {
LOG.info("Skipping resource copy for resource {} on taskmanager as the file already exists: {}", DISTRIBUTED_CACHE_RESOURCE_NAME, TASKMANAGER_LOCAL_FILE_LOCATION);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we come up with a utility class that encapsulates downloading of the necessary certificates? This way the initialisation logic could be a one liner that can be included by any operator that requires the files.

Ideally that should be an idempotent process that could be run multiple times without sideeffects.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants