diff --git a/flinkx-launcher/src/main/java/com/dtstack/flinkx/launcher/ClusterClientFactory.java b/flinkx-launcher/src/main/java/com/dtstack/flinkx/launcher/ClusterClientFactory.java index a01cc56432..69367529be 100644 --- a/flinkx-launcher/src/main/java/com/dtstack/flinkx/launcher/ClusterClientFactory.java +++ b/flinkx-launcher/src/main/java/com/dtstack/flinkx/launcher/ClusterClientFactory.java @@ -23,13 +23,12 @@ import org.apache.flink.client.deployment.StandaloneClusterId; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.rest.RestClusterClient; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.GlobalConfiguration; -import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.*; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.util.LeaderConnectionInfo; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; import org.apache.flink.yarn.AbstractYarnClusterDescriptor; import org.apache.flink.yarn.LegacyYarnClusterDescriptor; import org.apache.flink.yarn.YarnClusterDescriptor; @@ -126,8 +125,12 @@ public static ClusterClient createYarnClient(LauncherOptions launcherOptions) { throw new RuntimeException("No flink session found on yarn cluster."); } - AbstractYarnClusterDescriptor clusterDescriptor = new LegacyYarnClusterDescriptor(config, yarnConf, ".", yarnClient, false); - ClusterClient clusterClient = clusterDescriptor.retrieve(applicationId); + HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(config); + if(highAvailabilityMode.equals(HighAvailabilityMode.ZOOKEEPER) && applicationId!=null){ + config.setString(HighAvailabilityOptions.HA_CLUSTER_ID,applicationId.toString()); + } + YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(config, yarnConf, "", yarnClient, false); + ClusterClient clusterClient = yarnClusterDescriptor.retrieve(applicationId); clusterClient.setDetached(true); return clusterClient; } catch(Exception e) {