Skip to content

Test Walkthrough Apache Beam and Flink

brightcoder01 edited this page Nov 8, 2019 · 3 revisions

Apache Beam and Flink Walkthrough Test

Setup Clusters

  1. Setup Local Flink Cluster: https://ci.apache.org/projects/flink/flink-docs-release-1.8/tutorials/local_setup.html
  2. Setup Kubernetes Flink Cluster using Minikube: https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/kubernetes.html

Verify Clusters

Execute command “./bin/flink run examples/streaming/WordCount.jar”. Both Local and K8S Flink Cluster work fine.

Apache Beam Flink Runner

Instruction: https://beam.apache.org/documentation/runners/flink/

Sample Pipeline Code:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions([
    "--runner=PortableRunner",
    "--job_endpoint=localhost:8099",
    "--environment_type=LOOPBACK"
])
with beam.Pipeline(options=options) as pipeline:
    data = ["Sample data",
            "Sample data - 0",
            "Sample data - 1"]
    raw_data = (pipeline
            | 'CreateHardCodeData' >> beam.Create(data)
            | 'Map' >> beam.Map(lambda line : line + '.')
            | 'Print' >> beam.Map(print))

Verfiy different environment_type in Python SDK Harness Configuration
environment_type=LOOPBACK

  1. Run pipeline on local cluster: Works Fine
  2. Run pipeline on K8S cluster, Exceptions are thrown:
    java.lang.Exception: The user defined 'open()' method caused an exception: org.apache.beam.vendor.grpc.v1p21p0.io.grpc.StatusRuntimeException: UNAVAILABLE: io exception Caused by: org.apache.beam.vendor.grpc.v1p21p0.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:51017

environment_type=DOCKER

  1. Run pipeline on local cluster: Work fine
  2. Run pipeline on K8S cluster, Exception are thrown:
    Caused by: java.io.IOException: Cannot run program "docker": error=2, No such file or directory.