如果通过公网的方式访问namenode,也就是公网存储文件,需要
服务器 hdfs-site.xml需要添加
<property>
<name>dfs.client.use.datanode.hostname</name>
<value>true</value>
<description>only cofig in clients</description>
</property>
客户端测试代码 (注意如果是本地需要连接线上那么本地也需要下载hadoop并配置hadoop的环境变量)
Configuration conf = new Configuration();
conf.set("dfs.client.use.datanode.hostname", "true");
FileSystem fs = FileSystem.get(new URI("hdfs://106.53.25.61:9000"), conf);
fs.copyFromLocalFile(new Path("D:/aaa.txt"), new Path("/user"));
//fs.copyToLocalFile(new Path("/user/NOTICE.txt"),new Path("/user/aa.txt"));
//InputStream in = fs.open(new Path("/user/NOTICE.txt"));
//FileOutputStream out = new FileOutputStream(new File("D:/A.txt"));
//IOUtils.copyBytes(in, out, 2048, true);
fs.close();
System.out.println("上传完毕");
工具类代码:
package com.tools.common.helper;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import java.io.ByteArrayOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
@Component
@Slf4j
public class HdfsHelper {
private static String hdfsUrl;
public static String rootDir;
private static Configuration configuration;
private static void buildConfig() {
if (ObjectUtils.isEmpty(configuration)) {
configuration = new Configuration();
configuration.set("dfs.client.use.datanode.hostname", "true");
}
}
/**
* 创建目录
*
* @param path 需要创建的目录 例如 /user/aa 可以建立多级
* @return
*/
public static boolean mkdir(String path) throws Exception {
buildConfig();
FileSystem fileSystem = FileSystem.get(new URI(hdfsUrl), configuration);
fileSystem.mkdirs(new Path(String.format("%s%s", rootDir, path)));
fileSystem.close();
return true;
}
/**
* 上传文件
*
* @param sourcePath 本地文件路径 例如 D:/test.txt
* @param targetPath hdfs存放路径 例如 /test/test.txt
* @param delFile 是否删除本地文件
* @return
*/
public static boolean upload(String sourcePath, String targetPath, boolean delFile) throws Exception {
buildConfig();
FileSystem fileSystem = FileSystem.get(new URI(hdfsUrl), configuration);
fileSystem.copyFromLocalFile(delFile, new Path(sourcePath), new Path(String.format("%s%s", rootDir, targetPath)));
fileSystem.close();
return true;
}
/**
* 上传文件
*
* @param sourcePath 本地文件路径 例如 D:/test.txt
* @param targetPath hdfs存放路径 例如 /test/test.txt
* @return
*/
public static boolean upload(String sourcePath, String targetPath) throws Exception {
return upload(sourcePath, targetPath, false);
}
/**
* 下载文件
*
* @param sourcePath hdfs文件路径 例如 /test/test.txt
* @param targetPath 本地存放路径 例如 D:/test.txt
* @param delFile
* @return
* @throws Exception
*/
public static boolean download(String sourcePath, String targetPath, boolean delFile) throws IOException {
buildConfig();
FileSystem fileSystem = null;
FSDataInputStream fsdis = null;
FileOutputStream fos = null;
try {
fileSystem = FileSystem.get(new URI(hdfsUrl), configuration);
fsdis = fileSystem.open(new Path(String.format("%s%s", rootDir, sourcePath)));
fos = new FileOutputStream(targetPath);
byte[] buffer = new byte[2048];
int index = 0;
while ((index = fsdis.read(buffer)) != -1) {
fos.write(buffer, 0, index);
}
fos.flush();
if (delFile) {
delete(sourcePath);
}
} catch (Exception e) {
log.error("HdfsHelper download error:", e);
return false;
} finally {
if (!ObjectUtils.isEmpty(fos)) {
fos.close();
}
if (!ObjectUtils.isEmpty(fsdis)) {
fsdis.close();
}
if (!ObjectUtils.isEmpty(fileSystem)) {
fileSystem.close();
}
}
return true;
}
/**
* 获取文件字节数组
*
* @param sourcePath hdfs文件路径 例如 /test/test.txt
* @return
* @throws Exception
*/
public static byte[] getFileBytes(String sourcePath) throws IOException {
buildConfig();
FileSystem fileSystem = null;
FSDataInputStream fsdis = null;
ByteArrayOutputStream bos = null;
byte[] bytes;
try {
fileSystem = FileSystem.get(new URI(hdfsUrl), configuration);
fsdis = fileSystem.open(new Path(String.format("%s%s", rootDir, sourcePath)));
bos = new ByteArrayOutputStream();
byte[] buffer = new byte[2048];
int index = 0;
while ((index = fsdis.read(buffer)) != -1) {
bos.write(buffer, 0, index);
}
bytes = bos.toByteArray();
} catch (Exception e) {
log.error("HdfsHelper download error:", e);
return null;
} finally {
if (!ObjectUtils.isEmpty(bos)) {
bos.close();
}
if (!ObjectUtils.isEmpty(fsdis)) {
fsdis.close();
}
if (!ObjectUtils.isEmpty(fileSystem)) {
fileSystem.close();
}
}
return bytes;
}
/**
* 下载文件
*
* @param sourcePath hdfs文件路径 例如 /test/test.txt
* @param targetPath 本地存放路径 例如 D:/test.txt
* @return
* @throws Exception
*/
public static boolean download(String sourcePath, String targetPath) throws Exception {
return download(sourcePath, targetPath, false);
}
/**
* 判断文件或者目录是否存在
*
* @param path
* @return
*/
public static boolean isExist(String path) throws Exception {
buildConfig();
FileSystem fileSystem = FileSystem.get(new URI(hdfsUrl), configuration);
return fileSystem.exists(new Path(String.format("%s%s", rootDir, path)));
}
/**
* 删除文件或者目录
*
* @param path 目录路径 或者 文件路径
* @return
* @throws Exception
*/
public static boolean delete(String path) throws Exception {
buildConfig();
FileSystem fileSystem = FileSystem.get(new URI(hdfsUrl), configuration);
fileSystem.deleteOnExit(new Path(String.format("%s%s", rootDir, path)));
fileSystem.close();
return true;
}
@Value("${hdfs.url}")
public void setHdfsUrl(String hdfsUrl) {
HdfsHelper.hdfsUrl = hdfsUrl;
}
@Value("${hdfs.root-dir}")
public void setRootDir(String rootDir) {
HdfsHelper.rootDir = rootDir;
}
}
项目yml配置
hdfs:
url: hdfs://你的ip:9000
root-dir: /test
碰到的问题
- win10的3.2.1存在无法格式化namenode的问题,切换到3.1.2版本后此问题解决。