Skip to content

Commit

Permalink
Merge pull request #63 from gk0916/v1.5.1
Browse files Browse the repository at this point in the history
修复flinkx on yarn模式运行报错问题
  • Loading branch information
yangsishu authored Jun 27, 2019
2 parents 96b3da2 + 098d067 commit 8d27f38
Showing 1 changed file with 9 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@
import org.apache.flink.client.deployment.StandaloneClusterId;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.*;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.util.LeaderConnectionInfo;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
import org.apache.flink.yarn.LegacyYarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterDescriptor;
Expand Down Expand Up @@ -126,8 +125,12 @@ public static ClusterClient createYarnClient(LauncherOptions launcherOptions) {
throw new RuntimeException("No flink session found on yarn cluster.");
}

AbstractYarnClusterDescriptor clusterDescriptor = new LegacyYarnClusterDescriptor(config, yarnConf, ".", yarnClient, false);
ClusterClient clusterClient = clusterDescriptor.retrieve(applicationId);
HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(config);
if(highAvailabilityMode.equals(HighAvailabilityMode.ZOOKEEPER) && applicationId!=null){
config.setString(HighAvailabilityOptions.HA_CLUSTER_ID,applicationId.toString());
}
YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(config, yarnConf, "", yarnClient, false);
ClusterClient clusterClient = yarnClusterDescriptor.retrieve(applicationId);
clusterClient.setDetached(true);
return clusterClient;
} catch(Exception e) {
Expand Down

0 comments on commit 8d27f38

Please sign in to comment.