Skip to content

Commit

Permalink
Cherry-pick commits from DTStack.
Browse files Browse the repository at this point in the history
  • Loading branch information
tiezhu authored and FlechazoW committed Apr 11, 2022
1 parent 074d3da commit 55fb56b
Show file tree
Hide file tree
Showing 44 changed files with 1,343 additions and 516 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,6 @@
import static com.dtstack.flinkx.lookup.options.LookupOptions.LOOKUP_FETCH_SIZE;
import static com.dtstack.flinkx.lookup.options.LookupOptions.LOOKUP_MAX_RETRIES;
import static com.dtstack.flinkx.lookup.options.LookupOptions.LOOKUP_PARALLELISM;
import static com.dtstack.flinkx.sink.options.SinkOptions.SINK_BUFFER_FLUSH_INTERVAL;
import static com.dtstack.flinkx.sink.options.SinkOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
import static com.dtstack.flinkx.sink.options.SinkOptions.SINK_MAX_RETRIES;
import static com.dtstack.flinkx.source.options.SourceOptions.SCAN_FETCH_SIZE;
import static com.dtstack.flinkx.source.options.SourceOptions.SCAN_INCREMENT_COLUMN;
import static com.dtstack.flinkx.source.options.SourceOptions.SCAN_INCREMENT_COLUMN_TYPE;
Expand All @@ -77,6 +74,9 @@
import static com.dtstack.flinkx.source.options.SourceOptions.SCAN_RESTORE_COLUMNNAME;
import static com.dtstack.flinkx.source.options.SourceOptions.SCAN_RESTORE_COLUMNTYPE;
import static com.dtstack.flinkx.source.options.SourceOptions.SCAN_START_LOCATION;
import static com.dtstack.flinkx.table.options.SinkOptions.SINK_BUFFER_FLUSH_INTERVAL;
import static com.dtstack.flinkx.table.options.SinkOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
import static com.dtstack.flinkx.table.options.SinkOptions.SINK_MAX_RETRIES;

/**
* @author tiezhu
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.dtstack.flinkx.connector.hbase14.converter.DataSyncSinkConverter;
import com.dtstack.flinkx.connector.hbase14.util.HBaseConfigUtils;
import com.dtstack.flinkx.connector.hbase14.util.HBaseHelper;
import com.dtstack.flinkx.security.KerberosUtil;
import com.dtstack.flinkx.sink.format.BaseRichOutputFormat;
import com.dtstack.flinkx.throwable.WriteRecordException;

Expand All @@ -47,8 +46,6 @@
import java.util.Map;
import java.util.Objects;

import static org.apache.zookeeper.client.ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY;

/**
* The Hbase Implementation of OutputFormat
*
Expand Down Expand Up @@ -111,18 +108,7 @@ protected void writeSingleRecordInternal(RowData rawRecord) throws WriteRecordEx
public void openInternal(int taskNumber, int numTasks) throws IOException {
boolean openKerberos = HBaseConfigUtils.isEnableKerberos(hbaseConfig);
if (openKerberos) {
// TDH环境并且zk开启了kerberos需要设置zk的环境变量
if (HBaseHelper.openKerberosForZk(hbaseConfig)) {
String keytabFile =
HBaseHelper.getKeyTabFileName(
hbaseConfig, getRuntimeContext().getDistributedCache(), jobId);
String principal = KerberosUtil.getPrincipal(hbaseConfig, keytabFile);
String client = System.getProperty(LOGIN_CONTEXT_NAME_KEY, "Client");
KerberosUtil.appendOrUpdateJaasConf(client, keytabFile, principal);
}
UserGroupInformation ugi =
HBaseHelper.getUgi(
hbaseConfig, getRuntimeContext().getDistributedCache(), jobId);
UserGroupInformation ugi = HBaseHelper.getUgi(hbaseConfig);
ugi.doAs(
(PrivilegedAction<Object>)
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,8 @@

import com.dtstack.flinkx.connector.hbase14.util.HBaseConfigUtils;
import com.dtstack.flinkx.connector.hbase14.util.HBaseHelper;
import com.dtstack.flinkx.security.KerberosUtil;
import com.dtstack.flinkx.source.format.BaseRichInputFormat;
import com.dtstack.flinkx.util.PluginUtil;

import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
Expand Down Expand Up @@ -55,8 +52,6 @@
import java.util.Locale;
import java.util.Map;

import static org.apache.zookeeper.client.ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY;

/**
* The InputFormat Implementation used for HbaseReader
*
Expand Down Expand Up @@ -96,31 +91,17 @@ public void openInputFormat() throws IOException {

LOG.info("HbaseOutputFormat openInputFormat start");
nameMaps = Maps.newConcurrentMap();
// tdh环境中 hbase关联的zk开启kerberos 需要添加zk的jaas文件配置
if (HBaseHelper.openKerberosForZk(hbaseConfig)) {
setZkJaasConfiguration(getRuntimeContext().getDistributedCache());
}
connection =
HBaseHelper.getHbaseConnection(
hbaseConfig, getRuntimeContext().getDistributedCache(), jobId);

connection = HBaseHelper.getHbaseConnection(hbaseConfig);

LOG.info("HbaseOutputFormat openInputFormat end");
}

@Override
public InputSplit[] createInputSplitsInternal(int minNumSplits) throws IOException {
DistributedCache distributedCache =
PluginUtil.createDistributedCacheFromContextClassLoader();
// tdh环境中 hbase关联的zk开启kerberos 需要添加zk的jaas文件配置
if (HBaseHelper.openKerberosForZk(hbaseConfig)) {
setZkJaasConfiguration(getRuntimeContext().getDistributedCache());
}
try (Connection connection =
HBaseHelper.getHbaseConnection(hbaseConfig, distributedCache, jobId)) {
try (Connection connection = HBaseHelper.getHbaseConnection(hbaseConfig)) {
if (HBaseConfigUtils.isEnableKerberos(hbaseConfig)) {
UserGroupInformation ugi =
HBaseHelper.getUgi(
hbaseConfig, getRuntimeContext().getDistributedCache(), jobId);
UserGroupInformation ugi = HBaseHelper.getUgi(hbaseConfig);
return ugi.doAs(
(PrivilegedAction<HBaseInputSplit[]>)
() ->
Expand Down Expand Up @@ -261,16 +242,10 @@ public void openInternal(InputSplit inputSplit) throws IOException {
byte[] stopRow = Bytes.toBytesBinary(hbaseInputSplit.getEndKey());

if (null == connection || connection.isClosed()) {
connection =
HBaseHelper.getHbaseConnection(
hbaseConfig, getRuntimeContext().getDistributedCache(), jobId);
connection = HBaseHelper.getHbaseConnection(hbaseConfig);
}

openKerberos = HBaseConfigUtils.isEnableKerberos(hbaseConfig);
// tdh环境中 hbase关联的zk开启kerberos 需要添加zk的jaas文件配置
if (HBaseHelper.openKerberosForZk(hbaseConfig)) {
setZkJaasConfiguration(getRuntimeContext().getDistributedCache());
}

table = connection.getTable(TableName.valueOf(tableName));
scan = new Scan();
Expand Down Expand Up @@ -405,12 +380,4 @@ public Object convertBytesToAssignType(String columnType, byte[] byteArray, Stri
}
return column;
}

// 设置zk的jaas配置
private void setZkJaasConfiguration(DistributedCache distributedCache) {
String keytabFile = HBaseHelper.getKeyTabFileName(hbaseConfig, distributedCache, jobId);
String principal = KerberosUtil.getPrincipal(hbaseConfig, keytabFile);
String client = System.getProperty(LOGIN_CONTEXT_NAME_KEY, "Client");
KerberosUtil.appendOrUpdateJaasConf(client, keytabFile, principal);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@
package com.dtstack.flinkx.connector.hbase14.util;

import com.dtstack.flinkx.security.KerberosUtil;
import com.dtstack.flinkx.util.FileSystemUtil;

import org.apache.flink.api.common.cache.DistributedCache;

import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -44,6 +41,7 @@
import java.security.PrivilegedAction;
import java.util.Map;

import static com.dtstack.flinkx.connector.hbase14.util.HBaseConfigUtils.KEY_JAVA_SECURITY_KRB5_CONF;
import static com.dtstack.flinkx.security.KerberosUtil.KRB_STR;

/**
Expand All @@ -60,12 +58,11 @@ public class HBaseHelper {
private static final String KEY_HBASE_SECURITY_AUTHORIZATION = "hbase.security.authorization";
private static final String KEY_HBASE_SECURITY_AUTH_ENABLE = "hbase.security.auth.enable";

public static Connection getHbaseConnection(
Map<String, Object> hbaseConfigMap, DistributedCache distributedCache, String jobId) {
public static Connection getHbaseConnection(Map<String, Object> hbaseConfigMap) {
Validate.isTrue(MapUtils.isNotEmpty(hbaseConfigMap), "hbaseConfig不能为空Map结构!");

if (HBaseConfigUtils.isEnableKerberos(hbaseConfigMap)) {
return getConnectionWithKerberos(hbaseConfigMap, distributedCache, jobId);
return getConnectionWithKerberos(hbaseConfigMap);
}

try {
Expand All @@ -77,11 +74,10 @@ public static Connection getHbaseConnection(
}
}

private static Connection getConnectionWithKerberos(
Map<String, Object> hbaseConfigMap, DistributedCache distributedCache, String jobId) {
private static Connection getConnectionWithKerberos(Map<String, Object> hbaseConfigMap) {
try {
setKerberosConf(hbaseConfigMap);
UserGroupInformation ugi = getUgi(hbaseConfigMap, distributedCache, jobId);
UserGroupInformation ugi = getUgi(hbaseConfigMap);
return ugi.doAs(
(PrivilegedAction<Connection>)
() -> {
Expand All @@ -98,19 +94,17 @@ private static Connection getConnectionWithKerberos(
}
}

public static UserGroupInformation getUgi(
Map<String, Object> hbaseConfigMap, DistributedCache distributedCache, String jobId)
public static UserGroupInformation getUgi(Map<String, Object> hbaseConfigMap)
throws IOException {
String keytabFileName = KerberosUtil.getPrincipalFileName(hbaseConfigMap);

keytabFileName =
KerberosUtil.loadFile(hbaseConfigMap, keytabFileName, distributedCache, jobId);
keytabFileName = KerberosUtil.loadFile(hbaseConfigMap, keytabFileName);
String principal = KerberosUtil.getPrincipal(hbaseConfigMap, keytabFileName);
KerberosUtil.loadKrb5Conf(hbaseConfigMap, distributedCache, jobId);
KerberosUtil.loadKrb5Conf(hbaseConfigMap);
KerberosUtil.refreshConfig();
Configuration conf = FileSystemUtil.getConfiguration(hbaseConfigMap, null);

return KerberosUtil.loginAndReturnUgi(conf, principal, keytabFileName);
return KerberosUtil.loginAndReturnUgi(
principal, keytabFileName, System.getProperty(KEY_JAVA_SECURITY_KRB5_CONF));
}

public static Configuration getConfig(Map<String, Object> hbaseConfigMap) {
Expand All @@ -135,33 +129,6 @@ public static void setKerberosConf(Map<String, Object> hbaseConfigMap) {
hbaseConfigMap.put(KEY_HBASE_SECURITY_AUTH_ENABLE, true);
}

/**
* 获取hbase关联的zk是否也开启了kerberos
*
* @param hbaseConfigMap
* @return
*/
public static boolean openKerberosForZk(Map<String, Object> hbaseConfigMap) {
String openKerberos =
MapUtils.getString(hbaseConfigMap, "zookeeper.security.authentication", "default");
return "kerberos".equalsIgnoreCase(openKerberos);
}

/**
* 获取keyTab文件的本地路径
*
* @param hbaseConfigMap
* @param distributedCache
* @param jobId
* @return
*/
public static String getKeyTabFileName(
Map<String, Object> hbaseConfigMap, DistributedCache distributedCache, String jobId) {
String keytabFileName = KerberosUtil.getPrincipalFileName(hbaseConfigMap);
return KerberosUtil.getLocalFileName(
hbaseConfigMap, keytabFileName, distributedCache, jobId);
}

public static RegionLocator getRegionLocator(Connection hConnection, String userTable) {
TableName hTableName = TableName.valueOf(userTable);
Admin admin = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,7 @@ protected void openSource() {
try {
fs =
FileSystemUtil.getFileSystem(
hdfsConf.getHadoopConfig(),
hdfsConf.getDefaultFS(),
distributedCache,
jobId);
hdfsConf.getHadoopConfig(), hdfsConf.getDefaultFS(), distributedCache);
} catch (Exception e) {
throw new FlinkxRuntimeException("can't init fileSystem", e);
}
Expand Down Expand Up @@ -184,12 +181,10 @@ protected List<String> copyTmpDataFileToDir() {
try {
FileStatus[] dataFiles = fs.listStatus(tmpDir, pathFilter);
for (FileStatus dataFile : dataFiles) {
if (!filterFile(dataFile)) {
currentFilePath = dataFile.getPath().getName();
FileUtil.copy(fs, dataFile.getPath(), fs, dir, false, conf);
copyList.add(currentFilePath);
LOG.info("copy temp file:{} to dir:{}", currentFilePath, dir);
}
currentFilePath = dataFile.getPath().getName();
FileUtil.copy(fs, dataFile.getPath(), fs, dir, false, conf);
copyList.add(currentFilePath);
LOG.info("copy temp file:{} to dir:{}", currentFilePath, dir);
}
} catch (Exception e) {
throw new FlinkxRuntimeException(
Expand Down Expand Up @@ -229,11 +224,9 @@ protected void moveAllTmpDataFileToDir() {

FileStatus[] dataFiles = fs.listStatus(tmpDir);
for (FileStatus dataFile : dataFiles) {
if (!filterFile(dataFile)) {
currentFilePath = dataFile.getPath().getName();
fs.rename(dataFile.getPath(), dir);
LOG.info("move temp file:{} to dir:{}", dataFile.getPath(), dir);
}
currentFilePath = dataFile.getPath().getName();
fs.rename(dataFile.getPath(), dir);
LOG.info("move temp file:{} to dir:{}", dataFile.getPath(), dir);
}
fs.delete(tmpDir, true);
} catch (IOException e) {
Expand Down Expand Up @@ -294,16 +287,4 @@ public HdfsConf getHdfsConf() {
public void setHdfsConf(HdfsConf hdfsConf) {
this.hdfsConf = hdfsConf;
}

/** filter file when move file to dataPath* */
protected boolean filterFile(FileStatus fileStatus) {
if (fileStatus.getLen() == 0) {
LOG.warn(
"file {} has filter,because file len [{}] is 0 ",
fileStatus.getPath(),
fileStatus.getLen());
return true;
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@

import org.apache.flink.table.data.RowData;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.parquet.column.ParquetProperties;
Expand All @@ -51,7 +50,6 @@

import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.security.PrivilegedAction;
import java.util.HashMap;
import java.util.List;
Expand All @@ -71,9 +69,6 @@ public class HdfsParquetOutputFormat extends BaseHdfsOutputFormat {
private ParquetWriter<Group> writer;
private MessageType schema;

public static final byte[] MAGIC = "PAR1".getBytes(Charset.forName("ASCII"));
public static int PARQUET_FOOTER_LENGTH_SIZE = 4;

@Override
protected void openSource() {
super.openSource();
Expand Down Expand Up @@ -135,8 +130,7 @@ protected void nextBlock() {
FileSystemUtil.getUGI(
hdfsConf.getHadoopConfig(),
hdfsConf.getDefaultFS(),
getRuntimeContext().getDistributedCache(),
jobId);
getRuntimeContext().getDistributedCache());
ugi.doAs(
(PrivilegedAction<Object>)
() -> {
Expand Down Expand Up @@ -297,18 +291,4 @@ private MessageType buildSchema() {

return typeBuilder.named("Pair");
}

@Override
protected boolean filterFile(FileStatus fileStatus) {
if (fileStatus.getLen()
< (long) (MAGIC.length + PARQUET_FOOTER_LENGTH_SIZE + MAGIC.length)) {
LOG.warn(
"file {} has filter,because file len [{}] less than [{}] ",
fileStatus.getPath(),
fileStatus.getLen(),
(long) (MAGIC.length + PARQUET_FOOTER_LENGTH_SIZE + MAGIC.length));
return true;
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,7 @@ public InputSplit[] createInputSplitsInternal(int minNumSplits) throws IOExcepti
PluginUtil.createDistributedCacheFromContextClassLoader();
UserGroupInformation ugi =
FileSystemUtil.getUGI(
hdfsConf.getHadoopConfig(),
hdfsConf.getDefaultFS(),
distributedCache,
jobId);
hdfsConf.getHadoopConfig(), hdfsConf.getDefaultFS(), distributedCache);
LOG.info("user:{}, ", ugi.getShortUserName());
return ugi.doAs(
(PrivilegedAction<InputSplit[]>)
Expand Down Expand Up @@ -101,8 +98,7 @@ public void openInputFormat() throws IOException {
FileSystemUtil.getUGI(
hdfsConf.getHadoopConfig(),
hdfsConf.getDefaultFS(),
getRuntimeContext().getDistributedCache(),
jobId);
getRuntimeContext().getDistributedCache());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ public InputSplit[] createHdfsSplit(int minNumSplits) {
FileSystemUtil.getFileSystem(
hdfsConf.getHadoopConfig(),
hdfsConf.getDefaultFS(),
PluginUtil.createDistributedCacheFromContextClassLoader(),
jobId)) {
PluginUtil.createDistributedCacheFromContextClassLoader())) {
allFilePaths = getAllPartitionPath(hdfsConf.getPath(), fs, pathFilter);
} catch (Exception e) {
throw new FlinkxRuntimeException(e);
Expand Down
Loading

0 comments on commit 55fb56b

Please sign in to comment.