diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java index 62d917ca0d1..6175e945765 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java @@ -17,8 +17,6 @@ package org.apache.seatunnel.connectors.seatunnel.hive.utils; -import org.apache.seatunnel.shade.com.google.common.collect.ImmutableList; - import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopLoginFactory; import org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfigOptions; @@ -42,8 +40,11 @@ import java.io.File; import java.io.IOException; import java.net.MalformedURLException; +import java.nio.file.DirectoryStream; import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -51,44 +52,41 @@ public class HiveMetaStoreProxy { private HiveMetaStoreClient hiveMetaStoreClient; private static volatile HiveMetaStoreProxy INSTANCE = null; - private static final List HADOOP_CONF_FILES = ImmutableList.of("hive-site.xml"); private HiveMetaStoreProxy(ReadonlyConfig readonlyConfig) { String metastoreUri = readonlyConfig.get(HiveSourceOptions.METASTORE_URI); String hiveHadoopConfigPath = readonlyConfig.get(HiveConfig.HADOOP_CONF_PATH); String hiveSitePath = readonlyConfig.get(HiveConfig.HIVE_SITE_PATH); - HiveConf hiveConf = new HiveConf(); - hiveConf.set("hive.metastore.uris", metastoreUri); + Configuration hadoopConf = new Configuration(); try { if (StringUtils.isNotBlank(hiveHadoopConfigPath)) { - HADOOP_CONF_FILES.forEach( - confFile -> { - java.nio.file.Path path = Paths.get(hiveHadoopConfigPath, confFile); - if (Files.exists(path)) { - try { - hiveConf.addResource(path.toUri().toURL()); - } catch (IOException e) { - log.warn( - "Error adding Hadoop resource {}, resource was not added", - path, - e); - } - } - }); + getConfigFiles(hiveHadoopConfigPath) + .forEach( + confFile -> { + try { + hadoopConf.addResource(confFile.toUri().toURL()); + } catch (MalformedURLException e) { + log.warn( + "Error adding Hadoop resource {}, resource was not added", + confFile, + e); + } + }); } if (StringUtils.isNotBlank(hiveSitePath)) { - hiveConf.addResource(new File(hiveSitePath).toURI().toURL()); + hadoopConf.addResource(new File(hiveSitePath).toURI().toURL()); } - log.info("hive client conf:{}", hiveConf); + HiveConf hiveConf = new HiveConf(hadoopConf, HiveConf.class); + hiveConf.set("hive.metastore.uris", metastoreUri); + if (HiveMetaStoreProxyUtils.enableKerberos(readonlyConfig)) { // login Kerberos - Configuration authConf = new Configuration(); - authConf.set("hadoop.security.authentication", "kerberos"); + hadoopConf.set("hadoop.security.authentication", "kerberos"); this.hiveMetaStoreClient = HadoopLoginFactory.loginWithKerberos( - authConf, + hadoopConf, readonlyConfig.get(HdfsSourceConfigOptions.KRB5_PATH), readonlyConfig.get(HdfsSourceConfigOptions.KERBEROS_PRINCIPAL), readonlyConfig.get(HdfsSourceConfigOptions.KERBEROS_KEYTAB_PATH), @@ -119,7 +117,7 @@ private HiveMetaStoreProxy(ReadonlyConfig readonlyConfig) { } catch (MalformedURLException e) { String errorMsg = String.format( - "Using this hive uris [%s], hive conf [%s] to initialize " + "Using this hive uris [%s], hive hadoopConf [%s] to initialize " + "hive metastore client instance failed", metastoreUri, readonlyConfig.get(HiveSourceOptions.HIVE_SITE_PATH)); throw new HiveConnectorException( @@ -132,6 +130,19 @@ private HiveMetaStoreProxy(ReadonlyConfig readonlyConfig) { } } + private static List getConfigFiles(String hiveHadoopConfigPath) throws IOException { + List configFiles = new ArrayList<>(); + Path dirPath = Paths.get(hiveHadoopConfigPath); + + try (DirectoryStream stream = Files.newDirectoryStream(dirPath, "*-site.xml")) { + for (Path entry : stream) { + configFiles.add(entry); + } + } + + return configFiles; + } + public static HiveMetaStoreProxy getInstance(ReadonlyConfig readonlyConfig) { if (INSTANCE == null) { synchronized (HiveMetaStoreProxy.class) {