From 098d06739b6dc998a13e69077955671d0bb2f247 Mon Sep 17 00:00:00 2001 From: daemin Date: Thu, 27 Jun 2019 07:58:44 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dflinkx=20on=20yarn=E6=A8=A1?= =?UTF-8?q?=E5=BC=8F=E8=BF=90=E8=A1=8C=E6=8A=A5=E9=94=99=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../flinkx/launcher/ClusterClientFactory.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/flinkx-launcher/src/main/java/com/dtstack/flinkx/launcher/ClusterClientFactory.java b/flinkx-launcher/src/main/java/com/dtstack/flinkx/launcher/ClusterClientFactory.java index a01cc56432..69367529be 100644 --- a/flinkx-launcher/src/main/java/com/dtstack/flinkx/launcher/ClusterClientFactory.java +++ b/flinkx-launcher/src/main/java/com/dtstack/flinkx/launcher/ClusterClientFactory.java @@ -23,13 +23,12 @@ import org.apache.flink.client.deployment.StandaloneClusterId; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.rest.RestClusterClient; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.GlobalConfiguration; -import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.*; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.util.LeaderConnectionInfo; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; import org.apache.flink.yarn.AbstractYarnClusterDescriptor; import org.apache.flink.yarn.LegacyYarnClusterDescriptor; import org.apache.flink.yarn.YarnClusterDescriptor; @@ -126,8 +125,12 @@ public static ClusterClient createYarnClient(LauncherOptions launcherOptions) { throw new RuntimeException("No flink session found on yarn cluster."); } - AbstractYarnClusterDescriptor clusterDescriptor = new LegacyYarnClusterDescriptor(config, yarnConf, ".", yarnClient, false); - ClusterClient clusterClient = clusterDescriptor.retrieve(applicationId); + HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(config); + if(highAvailabilityMode.equals(HighAvailabilityMode.ZOOKEEPER) && applicationId!=null){ + config.setString(HighAvailabilityOptions.HA_CLUSTER_ID,applicationId.toString()); + } + YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(config, yarnConf, "", yarnClient, false); + ClusterClient clusterClient = yarnClusterDescriptor.retrieve(applicationId); clusterClient.setDetached(true); return clusterClient; } catch(Exception e) {