前言
本文是个人之前纪录的MapReduce学习笔记,主要涉及到MapReduce基本概念、Hadoop 经典示例WordCount的使用解析、hdfs与hbase的简单了解使用。现在整理了一下分享出来,希望对别人有所帮助。
学习MapReduce一定要理解这种Map、Reduce的编程模型以及Mapper、Reducer数据处理的原理,否则只是一味的复制粘贴可能比较难上手。
同时学习大数据的知识,一定要将自己对分布式的理解研究透彻。
一、概念理解
- MapReduce 是一种线性的可伸缩的编程模型,用于大规模数据集(大于1TB)的并行运算
- 在MapReduce里,Map处理的是原始数据,每条数据之间互相没有关系(这一点一定要注意)。Reduce阶段,以key为标识,对同一个key下的value进行统计,类似{key,[value1,value2……]}
- 可以把MapReduce理解为,把一堆杂乱无章的数据按照某种特征归纳起来,然后处理并得到最后的结果。
Map面对的是杂乱无章的互不相关的数据,它解析每个数据,从中提取出key和value,也就是提取了数据的特征。
经过MapReduce的Shuffle阶段之后,在Reduce阶段看到的都是已经归纳好的数据了,在此基础上我们可以做进一步的处理以便得到结果。 - 缺点:不适用于实时计算,实时计算一般最低都是要求秒级响应的,MR很难满足这个要求,实时计算一般采用storm等流式计算系统
-
MapReduce计算流程
二、编程模型
- 每个应用程序称为一个作业(Job),每个Job是由一系列的Mapper和Reducer来完成
- 任务过程分为两个阶段,map和reduce阶段,两个阶段都是使用键值对(key/value)作为输入输出的
- 每个Mapper处理一个Split,每个split对应一个map线程。Split中的数据作为map的输入,map的输出一定在map端
- Map方法:Map(k1,v1) -> list(k2,v2) ,并行应用于每一个输入的数据集,每一次调用都会产生一个(k2,v2)的队列 。
- Reduce方法:Reduce(k2,list(v2)) -> list(k3,v3)。收集map端输出队列list(k2,v2)中有相同key的数据对,把它们聚集在一起,输出时形成目的数据 list(k3,v3)。
- 新旧版本API的区别:
- 新的api放在:org.apache.hadoop.mapreduce,旧版api放在:org.apache.hadoop.mapred
- 新API使用虚类,旧版使用的是接口,虚类更加利于扩展
三、运行机制
-
输入分片(input split)
map计算之前,MapReduce会根据输入文件计算输入分片(input -> spliting),每个input split针对一个map任务。split存储的并不是数据,而是一个分片长度和一个记录数据的位置的数组
-
map阶段
map阶段的操作一般都是在数据存储节点上操作,所以有时候为了能够减轻数据传输的网络压力,可以先combiner阶段处理一下数据,在进行reduce
-
combiner阶段
此阶段是可选的,不是必须经过的一个阶段,combiner其实也是一种reduce操作,可以说combiner是一种本地化的reduce操作,是map运算的后续操作,可以减轻网络传输的压力。但是combiner的使用需要注意不要影响到reduce的最终结果,比如计算平均值的时候如果使用combiner就会影响最终的结果,但是计算总数的话则对最终结果没影响
-
shuffle阶段
将map的输出作为reduce的输入,这个过程就是shuffle,是MapReduce优化的重要阶段。
-
reduce阶段
reducer阶段,输入是shuffle阶段的输出,对每个不同的键和该键对应的值的数据流进行独立、并行的处理。
四、WordCount--官方提供的example
代码
package com.smile.test;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
private static final String INPUT_PATH = "/user/cdh/yjq/input/words.txt";
//hdfs输出路径
private static final String OUTPUT_PATH = "/user/cdh/yjq/output/";
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
// Text 实现了BinaryComparable类可以作为key值
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
// 解析键值对
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
@SuppressWarnings("deprecation")
public static void main(String[] args) throws Exception {
String[] paths = {INPUT_PATH,OUTPUT_PATH};
//获得Configuration配置 Configuration: core-default.xml, core-site.xml
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, paths).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
// 设置Mapper类
job.setMapperClass(TokenizerMapper.class);
// 设置Combiner类
job.setCombinerClass(IntSumReducer.class);
// 设置Reduce类
job.setReducerClass(IntSumReducer.class);
// 设置输出key的类型,注意跟reduce的输出类型保持一致
job.setOutputKeyClass(Text.class);
// 设置输出value的类型,注意跟reduce的输出类型保持一致
job.setOutputValueClass(IntWritable.class);
// 设置输入路径
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
// 设置输出路径
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
解析
MapReduce的输出路径一定要保证文件夹不存在,最好的解决方法时在代码中添加判断,执行之前删除output文件夹(具体方法见下面的hdfs操作)
MapReduce可以没有输出,但必须设置输出路径
-
MapReduce的输入路径可以直接写hdfs的目录路径,然后放在集群下执行,
hadoop jar **.jar java类名 参数1 参数2 ...
-
Mapper
//map public void map(Object key, Text value, Context context)
前面两个参数分别是输入的key,value,Context context可以记录输入的key和value,context也可以记录map运算的状态
map中的context记录了map执行的上下文,在mapper类中,context可以存储一些job conf的信息,也就是说context是作为参数传递的载体。比如runner中configuration的set信息[conf.set(Str, strValue)
],map中可以get到[context.getConfiguration().get(Str)
]//setup protected void setup(Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context) //cleanup protected void cleanup(Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context)
MapReduce框架内的setup和cleanup方法只会执行一次,所以一些相关变量或者是资源的初始化和释放最好是在setup中执行,如果放在map中执行,则在解析每一行数据的时候都会执行一次,严重影响程序运行效率。
-
Reducer
public void reduce(Text key, Iterable<IntWritable> values, Context context)
reduce的输入也是key/value形式,不过是values,也就是一个key对应的一组value,例如key,value1;key,value2...
reducer不是必须的,如果用不到reducer阶段可以不写reduce会接收到不同map传递过来的数据 ,并且每个map传递过来的数据都是有序的。如果reduce端接收到的数据量比较小,那么会存储在内存中,如果超出缓冲区大小一定比例,则会合并后写到磁盘上
-
调用 runner
Configuration conf = new Configuration(); //连接hbase,操作hbase Configuration conf = HBaseConfiguration.create();
MapReduce运行之前都要初始化Configuration,主要是读取MapReduce系统配置,如core-site.xml、hdfs-site.xml、mapred-site.xml、hbase-site.xml
scan.setCaching(500);
增加缓存读取条数(一次RPC调用返回多行纪录,也就是每次从服务器端读取的行数),加快scanner读取速度,但耗费内存增加,设太大会响应慢、超时或者OOM。
setBatch(int batch)
设置获取纪录的列个数,默认无限制,也就是返回所有的列。实际上就是控制一次next()传输多少个columns,如batch为5表示每个result实例返回5个columns
setBatch使用场景为,用客户端的scanner缓存进行批量交互从而提高性能时,非常大的行可能无法放入客户端的内存,这时需要用HBase客户端API中进行batching处理。
scan.setCacheBlocks(false);
默认是true,分内存,缓存和磁盘,三种方式,一般数据的读取为内存->缓存->磁盘;
setCacheBlocks不适合MapReduce工作:
MR程序为非热点数据,不需要缓存,因为Blockcache is LRU,也就是最近最少访问算法(扔掉最少访问的),那么,前一个请求(比如map读取)读入Blockcache的所有记录在后一个请求(新的map读取)中都没有用,就必须全部被swap,那么RegionServer要不断的进行无意义的swapping data,也就是无意义的输入和输出BlockCache,增加了无必要的IO。而普通读取时局部查找,或者查找最热数据时,会有提升性能的帮助。
runner方法中可以写定义多个job,job会顺序执行。
五、常用hadoop fs命令 (类似Linux的文件操作命令,可类比学习使用)
-help
功能:输出这个命令参数手册
-ls
功能:显示目录信息
示例: hadoop fs -ls /yjq
-mkdir
功能:在hdfs上创建目录
示例:hadoop fs -mkdir -p /yjq/test
-moveFromLocal
功能:从本地剪切粘贴到hdfs
示例:hadoop fs -moveFromLocal /home/cdh/a.txt /yjq/test
-moveToLocal
功能:从hdfs剪切粘贴到本地
示例:hadoop fs -moveToLocal /yjq/test/a.txt /home/cdh/
-copyFromLocal
功能:从本地文件系统中拷贝文件到hdfs路径去
示例:hadoop fs -copyFromLocal /home/cdh/a.txt /yjq/test
-copyToLocal
功能:从hdfs拷贝到本地
示例:hadoop fs -copyToLocal /yjq/test/a.txt /home/cdh/
-get
功能:等同于copyToLocal,从hdfs下载文件到本地路径(.表示当前路径)
示例:hadoop fs -get /yjq/test/a.txt .
-getmerge
功能:合并下载多个文件
示例:将目录下所有的TXT文件下载到本地,并合并成一个文件
hadoop fs -getmerge /yjq/test/*.txt /home/cdh/test.txt
-put
功能:等同于copyFromLocal
示例:hadoop fs -put /home/cdh/a.txt /yjq/test
-cp
功能:从hdfs的一个路径拷贝hdfs的另一个路径
示例: hadoop fs -cp /yjq/test1/a.txt /yjq/test2/
-mv
功能:在hdfs目录中移动文件
示例: hadoop fs -mv /yjq/test1/a.txt /yjq/test2/
-appendToFile
功能:追加一个文件到已经存在的文件末尾(本地文件追加到hdfs)
示例:Hadoop fs -appendToFile /home/cdh/a.txt /yjq/test1/a.txt
-cat
功能:显示文件内容
示例:hadoop fs -cat /yjq/test1/a.txt
-tail
功能:显示一个文件的末尾
示例:hadoop fs -tail /yjq/test1/a.txt
-text
功能:以字符形式打印一个文件的内容
示例:hadoop fs -text /yjq/test1/a.txt
-chgrp、-chmod、-chown
功能:修改文件所属权限
示例:
hadoop fs -chmod 666 /yjq/test1/a.txt
# cdh为用户名,hadoop为用户组
hadoop fs -chown cdh:group /yjq/test1/a.txt
-rm
功能:删除文件或文件夹
示例:hadoop fs -rm -r /yjq/test/a.txt
-df
功能:统计文件系统的可用空间信息
示例:hadoop fs -df -h /
-du
功能:统计文件夹的大小信息
示例:
hadoop fs -du -s -h /yjq/*
-count
功能:统计一个指定目录下的文件节点数量
示例:hadoop fs -count /yjq/
六、HBase 相关操作
- 简介
- HBase是一个分布式的、面向列的开源数据库
- 表由行和列组成,列划分为多个列族/列簇(column family)
- RowKey:是Byte array,是表中每条记录的“主键”,方便快速查找,Rowkey的设计非常重要。
- Column Family:列族,拥有一个名称(string),包含一个或者多个相关列
- Column:属于某一个columnfamily,familyName:columnName,每条记录可动态添加
-
编码
Configuration conf = HBaseConfiguration.create();
会自动读取hbase-site.xml配置文件
Scan scan = new Scan(); scan.setCaching(1000); scan.setStartRow(getBytes(startDate)); scan.setStopRow(getBytes(endDate)); TableMapReduceUtil.initTableMapperJob(HB_TABLE_NAME, scan, NewsStreamUrlMapper.class, Text.class, Text.class, job);
参数:hbase table name,scan,mapper class,outputKeyClass,outputValueClass,job
七、hdfs操作
-
运算之前清除hdfs上的文件夹
FileSystem fs = FileSystem.get(new Configuration()); Path outputDir = new Path(OUTPUT_PATH); //运算之前如果文件夹存在则清除文件夹 if(fs.exists(outputDir)) fs.delete(outputDir, true);
-
HDFS读流程
- 客户端向NameNode发起读数据请求
- NameNode找出距离最近的DataNode节点信息
- 客户端从DataNode分块下载文件
-
HDFS写流程
- 客户端向NameNode发起写数据请求
- 分块写入DataNode节点,DataNode自动完成副本备份
- DataNode向NameNode汇报存储完成,NameNode通知客户端
八、多表操作
MultiTableInputFormat 支持多个mapper的输出混合到一个shuffle,一个reducer,其中每个mapper拥有不同的inputFormat和mapper处理类。
所有的mapper需要输出相同的数据类型,对于输出value,需要标记该value来源,以便reducer识别
List<Scan> scans = new ArrayList<Scan>();
Scan scan1 = new Scan();
scan1.setCaching(100);
scan1.setCacheBlocks(false);
scan1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, inTable.getBytes());
scans.add(scan1);
Scan scan2 = new Scan();
scan2.setCaching(100);
scan2.setCacheBlocks(false);
scan2.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, inPhoneImsiTable.getBytes());
scans.add(scan2);
TableMapReduceUtil.initTableMapperJob(scans, ReadHbaseMapper.class, Text.class,Result.class, job);
九、错误处理
-
ScannerTimeoutException:org.apache.hadoop.hbase.client.ScannerTimeoutException
这是当从服务器传输数据到客户端的时间,或者客户端处理数据的时间大于了scanner设置的超时时间,scanner超时报错,可在客户端代码中设置超时时间
Configuration conf = HBaseConfiguration.create() conf.setLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,120000)
如果Mapper阶段对每条数据的处理时间过长,可以将scan.setCaching(1000)的值设置小一点,如果值设置太大,则处理时间会很长就会出现超时错误。
写在最后
很久之前写的学习笔记了,资料来源网络及项目组内的讨论,参考文献就不一一标注了,侵删~
如果您觉得本文对您有帮助,点个赞吧~~