今天周末,闲着无事,学着写一下第一个Hadoop应用的helloWorld程序
本次程序实现目的:计算一个文本文件各个单词出现的次数
首先,准备工作:
- window10
- VMWare
- ubuntu18.04
- idea(window版)
大致步骤: - 创建Hadoop项目
- 打包并上传到服务器上
- 使用hadoop启动测试程序
1. 创建Hadoop
首先使用maven随便创建一个Mouble
第二步,由于只是做一个后端测试程序,没必要其他的jar包,直接next
第三步,填写项目名和组,后面直接finish即可
第四步,对pom.xml文件编辑,确定hadoop的版本
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.0</version>
</dependency>
</dependencies>
第五步,创建Mapper类
/**
* 4个泛型中,前两个指定mapper输入数据的类型,KEYIN是输入key的类型,VALUEIN是输入的value的类型
* map和reduce的数据输入输出都是以key-value对的形式封装的
* 默认情况下,框架传递给我们的mapper的输入数据中,key是要处理的文本中一行的起始偏移量,这一行的内容作为value
*/
public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
//mapreduce框架每读一行数据,就调用一次该方法
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//具体业务逻辑就写在这个方法体中,而且我们业务要处理的数据已经被框架传递进来,在方法的参数中key-value
// key是这一行数据的起始偏移量, value是这一行的文本内容
String line = value.toString();
String words[] = StringUtils.split(line, ' ');
//遍历单词数组为key、value形式
for (String word : words) {
context.write(new Text(word), new LongWritable(1));
}
}
}
第六步,创建Reduce类
public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
//框架的map处理完成之后,将所有kv对缓存起来后,进行分组,然后传递一个组<key, value[]>,调用一次reduce方法
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long count = 0;
for (LongWritable value : values) {
count += value.get();
}
context.write(key, new LongWritable(count));
}
}
接着,就是作业类,用于指定使用哪个map以及哪个reduce
/**
* 用来测试一个特定的作业
* 比如,该作业使用哪个类作为逻辑处理中的map,哪个作为reduce
* 还可以指定该作业要处理的数据所在的路径
* 还可以指定作业输出的结果放到哪个路径
*/
public class WCRunner {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//设置整个job所用的哪些类在哪个jar包
job.setJarByClass(WCRunner.class);
//本作业job使用的的mapper和reducer的类
job.setMapperClass(WCMapper.class);
job.setReducerClass(WCReducer.class);
//指定reduce的输出数据kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//指定mapper的输出数据kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//指定原始数据存放在哪里
FileInputFormat.setInputPaths(job, new Path("hdfs://master:9000/mr/input/"));
//指定处理结果的输出数据存放路径
FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000/mr/output/"));
//将job提交给集群运行
job.waitForCompletion(true);
}
}
2. 打包,并上传文件
程序在这个就完了,然后就可以打包了。但我是在window上进行编程打包的,要把jar包放置到ubuntu虚拟机上,原本我在网上找了很多方法,比如下载什么软件、设置共享文件等等,后来发现,其实没必要这么麻烦。
我打开window的cmd,使用命令如下,链接linux,然后将window路径下的文件直接放置上去
#用hadoop账户链接master服务器,输入密码后就下载
sftp hadoop@master
#将Jar包放到linux目录下
put D:\code\HadoopDemo.jar
3. 启动Jar测试
在linux服务器上,先启动Hadoop的服务
#启动hdfs分布式文件和yarn资源管理系统
$ sbin/start-dfs.sh
$ sbin/start-yarn.sh
#确保hadoop正常运行
$ jps
3972 NodeManager
3304 DataNode
3806 ResourceManager
3134 NameNode
3550 SecondaryNameNode
7726 Jps
hadoop服务启动后,创建输入文件夹:
hadoop fs -mkdir /mr/
hadoop fs -mkdir /mr/input/
然后将需要测试的文件放入到输入文件中
# 其中text.txt文件是测试的文件
hadoop fs -put text.txt /mr/input/
现在就到启动我们的测试程序了
hadoop jar HadoopDemo-1.0-SNAPSHOT.jar com.xmx.wh.hadoop.mapreduce.wordcount.WCRunner
执行完毕后,可以用hadoop指令查看运行结果
# output是生成的文件夹,可以
$ hadoop fs -ls /mr/output
Found 2 items
-rw-r--r-- 1 hadoop supergroup 0 2021-04-17 17:23 /mr/output/_SUCCESS
-rw-r--r-- 1 hadoop supergroup 55 2021-04-17 17:23 /mr/output/part-r-00000
#查看结果
$ hadoop fs -cat /mr/output/part-r-00000
1
ello 6
hello 18
hhello 1
hhhhhhello 1
one 13
two 13