import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
public class HdfsKerberosDemo {
public static void main(String[] args) throws IOException {
// -Djava.security.krb5.conf=/home/xxx/kerberos/krb5.conf
// -Dkeytab.path=/home/xxx/kerberos/hive.service.keytab
String krb5File = "/xxx/krb5.conf";
String fileName = krb5File.substring(krb5File.lastIndexOf("/")+1);
String tempConfPath = "/root/temp/" + fileName;
try {
download(krb5File, tempConfPath);
} catch (IOException e) {
File folder = new File("/root/temp");
String[] files = folder.list();
boolean fileExists = false;
if (files != null) {
for (String file : files) {
if (file.equals(fileName)) {
fileExists = true;
break;
}
}
}
if (fileExists) {
//log.info("-------------------krb5conf文件存在-------------");
try {
Files.delete(Paths.get(tempConfPath));
} catch (IOException ioe) {
ioe.printStackTrace();
}
}
throw new RuntimeException("获取krb5.conf文件失败");
}
/** 设置krb5.conf到环境变量*/
System.setProperty("java.security.krb5.conf", tempConfPath);
String keytabFile = "/xxx/krb5.conf";
String keytabName = keytabFile.substring(keytabFile.lastIndexOf("/")+1);
String tempKeytabPath = "/root/temp/" + keytabName;
try {
download(keytabFile, tempKeytabPath);
} catch (IOException e) {
File folder = new File("/root/temp");
String[] files = folder.list();
boolean fileExists = false;
if (files != null) {
for (String file : files) {
if (file.equals(keytabName)) {
fileExists = true;
break;
}
}
}
if (fileExists) {
//log.info("-------------------krb5conf文件存在-------------");
try {
Files.delete(Paths.get(tempKeytabPath));
} catch (IOException ioe) {
ioe.printStackTrace();
}
}
throw new RuntimeException("获取krb5.conf文件失败");
}
System.setProperty("keytab.path", tempKeytabPath);
// 加载Hadoop配置
Configuration configuration = new Configuration();
configuration.addResource(new Path(HdfsKerberosExample.class.getClassLoader().getResource("core-site.xml").getPath()));
configuration.addResource(new Path(HdfsKerberosExample.class.getClassLoader().getResource("hdfs-site.xml").getPath()));
// 检测kerberos认证配置文件
String krb5conf = System.getProperty("java.security.krb5.conf");
String keytabPath = System.getProperty("keytab.path");
if (krb5conf == null) {
System.out.println("未找到krb5.conf,请配置VMOptions[java.security.krb5.conf]");
return;
} else if (keytabPath == null) {
System.out.println("未找到krb5.conf,请配置VMOptions[keytab.path]");
return;
}
System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
configuration.set("fs.defaultFS", "hdfs://xxxx"); //HDFS地址
//configuration.setClassLoader(org.apache.hadoop.hdfs.DistributedFileSystem.class.getClassLoader());
configuration.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
configuration.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
// 启用keytab renewal
configuration.set("hadoop.kerberos.keytab.login.autorenewal.enabled", "true");
configuration.set("hadoop.security.authentication", "Kerberos");
// 使用UserGroupInformation进行认证
UserGroupInformation.setConfiguration(configuration);
UserGroupInformation.loginUserFromKeytab("hive/datasophon01@HADOOP.COM", keytabPath);
System.out.println("====== 打印当前登录用户 START =====");
//System.out.println("user:" + UserGroupInformation.getCurrentUser());
System.out.println("====== 打印当前登录用户 END =====\n");
// 创建FileSystem实例
FileSystem fileSystem = FileSystem.get(configuration);
// 创建根路径
Path rootPath = new Path("/data");
System.out.println("====== ACL =====");
// 打印 ACL 内容
/*AclStatus aclStatus = fileSystem.getAclStatus(rootPath);
System.out.println(aclStatus);
System.out.println();*/
System.out.println(fileSystem.getStatus(new Path("/data")));
System.out.println("======= ROOT(/) Files ======");
RemoteIterator<LocatedFileStatus> list = FileSystem.get(configuration).listFiles(rootPath,true);
while (list.hasNext()){
LocatedFileStatus fileStatus =list.next();
System.out.println("文件路径为" + fileStatus.getPath());
}
/*RemoteIterator<LocatedFileStatus> fileStatus = fileSystem.listFiles(rootPath,true);
System.out.println("成功获取文件系统,正在导出文件系统内容,请稍后...");
FileWriter writer = new FileWriter("hdfs-files.txt");
while (fileStatus.hasNext()){
LocatedFileStatus st = fileStatus.next();
writer.write(st.getPath().toString());
}
writer.close();*/
System.out.println("hdfs文件内容写入成功");
// 关闭认证用户
// UserGroupInformation.getLoginUser().logout();
UserGroupInformation.getLoginUser().logoutUserFromKeytab();
}
private static void download(String url, String localPath) throws IOException {
URL website = new URL(url);
try (InputStream in = website.openStream()) {
Files.copy(in, Paths.get(localPath), StandardCopyOption.REPLACE_EXISTING);
}
}
}
win11上执行上述代码正常,但在linux服务器上执行同样的代码,用FileSystem Client去操作HDFS文件的时候,应用报如下的错误:org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBERO]。
在确认了集群已经是开启Kerberos认证之后,去看详细的日志相关信息,看到了如下的提示:
Login successful for user hdfs/xxxx@xxx.COM using keytab file /root/temp/xxxKEYTABFILE
说明应用端的Kerberos认证其实已经通过了,但是在操作HDFS文件的时候为什么还是报了Client cannot authenticate via:[TOKEN, KERBEROS]的错。
解决:换成如下代码
String krb5File = "/xxx/krb5.conf";
String fileName = krb5File.substring(krb5File.lastIndexOf("/")+1);
String tempConfPath = "D:\\Download\\" + fileName;
try {
download(krb5File, tempConfPath);
} catch (IOException e) {
File folder = new File("D:\\Download");
String[] files = folder.list();
boolean fileExists = false;
if (files != null) {
for (String file : files) {
if (file.equals(fileName)) {
fileExists = true;
break;
}
}
}
if (fileExists) {
//log.info("-------------------krb5conf文件存在-------------");
try {
Files.delete(Paths.get(tempConfPath));
} catch (IOException ioe) {
ioe.printStackTrace();
}
}
throw new RuntimeException("获取krb5.conf文件失败");
}
/** 设置krb5.conf到环境变量*/
System.setProperty("java.security.krb5.conf", tempConfPath);
String keytabFile = "/xxx/krb5.conf";
String keytabName = keytabFile.substring(keytabFile.lastIndexOf("/")+1);
String tempKeytabPath = "D:\\Download\\" + keytabName;
try {
download(keytabFile, tempKeytabPath);
} catch (IOException e) {
File folder = new File("D:\\Download");
String[] files = folder.list();
boolean fileExists = false;
if (files != null) {
for (String file : files) {
if (file.equals(keytabName)) {
fileExists = true;
break;
}
}
}
if (fileExists) {
//log.info("-------------------krb5conf文件存在-------------");
try {
Files.delete(Paths.get(tempKeytabPath));
} catch (IOException ioe) {
ioe.printStackTrace();
}
}
throw new RuntimeException("获取krb5.conf文件失败");
}
System.setProperty("keytab.path", tempKeytabPath);
// 加载Hadoop配置
Configuration configuration = new Configuration();
configuration.addResource(new Path(HdfsKerberosExample.class.getClassLoader().getResource("core-site.xml").getPath()));
configuration.addResource(new Path(HdfsKerberosExample.class.getClassLoader().getResource("hdfs-site.xml").getPath()));
// 检测kerberos认证配置文件
String krb5conf = System.getProperty("java.security.krb5.conf");
String keytabPath = System.getProperty("keytab.path");
if (krb5conf == null) {
System.out.println("未找到krb5.conf,请配置VMOptions[java.security.krb5.conf]");
return;
} else if (keytabPath == null) {
System.out.println("未找到krb5.conf,请配置VMOptions[keytab.path]");
return;
}
System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
configuration.set("fs.defaultFS", "hdfs://" + linkParameter.getHdfsAddress()); //HDFS地址
configuration.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
//configuration.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
Thread.currentThread().setContextClassLoader(org.apache.hadoop.hdfs.DistributedFileSystem.class.getClassLoader());
configuration.setClassLoader(org.apache.hadoop.hdfs.DistributedFileSystem.class.getClassLoader());
//configuration.setClassLoader(org.apache.hadoop.hdfs.DistributedFileSystem.class.getClassLoader());
// 启用keytab renewal
configuration.set("hadoop.kerberos.keytab.login.autorenewal.enabled", "true");
configuration.set("hadoop.security.authentication", taskDatasourceConfigDTO.getAuthentication());
// 使用UserGroupInformation进行认证
UserGroupInformation.setConfiguration(configuration);
UserGroupInformation.loginUserFromKeytab("hive/datasophon01@HADOOP.COM", keytabPath);
System.out.println("====== 打印当前登录用户 START =====");
//System.out.println("user:" + UserGroupInformation.getCurrentUser());
System.out.println("====== 打印当前登录用户 END =====\n");
// 创建FileSystem实例
FileSystem fileSystem = FileSystem.get(configuration);
// 创建根路径
Path rootPath = new Path("/data");
System.out.println("====== ACL =====");
// 打印 ACL 内容
/*AclStatus aclStatus = fileSystem.getAclStatus(rootPath);
System.out.println(aclStatus);
System.out.println();*/
System.out.println(fileSystem.getStatus(new Path("/data")));
System.out.println("======= ROOT(/) Files ======");
RemoteIterator<LocatedFileStatus> list = FileSystem.get(configuration).listFiles(rootPath,true);
while (list.hasNext()){
LocatedFileStatus fileStatus =list.next();
System.out.println("文件路径为" + fileStatus.getPath());
}
/*RemoteIterator<LocatedFileStatus> fileStatus = fileSystem.listFiles(rootPath,true);
System.out.println("成功获取文件系统,正在导出文件系统内容,请稍后...");
FileWriter writer = new FileWriter("hdfs-files.txt");
while (fileStatus.hasNext()){
LocatedFileStatus st = fileStatus.next();
writer.write(st.getPath().toString());
}
writer.close();*/
System.out.println("hdfs文件内容写入成功");
// 关闭认证用户
// UserGroupInformation.getLoginUser().logout();
UserGroupInformation.getLoginUser().logoutUserFromKeytab();
核心是如下三行
configuration.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());// configuration.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
Thread.currentThread().setContextClassLoader(org.apache.hadoop.hdfs.DistributedFileSystem.class.getClassLoader());
configuration.setClassLoader(org.apache.hadoop.hdfs.DistributedFileSystem.class.getClassLoader());