1. 简介
Hadoop Mapreduce是一个易于编程并且能在大型集群(上千节点)快速地并行得处理大量数据的软件框架,以可靠,容错的方式部署在商用机器上。
MapReduce作业通常将输入数据集分成独立的块,由map任务以完全平行的方式进行处理。框架对map的输出进行排序,然后输入到reduce任务。 通常,作业的输入和输出都存储在文件系统中。 该框架负责调度任务,监控它们并重新执行失败的任务。
通常,计算节点和存储节点是相同的,即MapReduce框架和Hadoop分布式文件系统在同一组节点上运行。 该配置允许框架在数据已经存在的节点上有效地调度任务,从而在整个集群中产生非常高的聚合带宽。
MapReduce框架由单个主ResourceManager,每个集群节点的一个从属NodeManager和每个应用程序的MRAppMaster组成。
Hadoop 客户端提交Job和配置信息给ResourceManger,它将负责把配置信息分配给从属节点,调度任务并且监控它们,把状态信息和诊断信息传输给客户端。
2. Inputs and Outputs
MapReduce 框架只操作键值对,MapReduce 将job的不同类型输入当做键值对来处理并且生成一组键值对作为输出。
Key和Value类必须通过实现Writable接口来实现序列化。此外,Key类必须实现WritableComparable 来使得排序更简单。
MapReduce作业的输入和输出类型:
(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)
3. Mapper、Reducer、Partitioner、Counter、Combiner
应用通常实现Mapper和Reducer接口提供map和reduce方法。这是Job的核心代码。
- Mapper
Mappers将输入的键值对转换成中间键值对。
Maps是多个单独执行的任务将输入转换成中间记录。那些被转换的中间记录不一定要和输入的记录为相同类型。输入键值对可以在map后输出0或者更多的键值对。MapReduce 会根据 InputFormat 切分成的各个 InputSplit 都创建一个map任务。所有的中间值都会按照Key进行排序,然后传输给一个特定的Reducer做最后确定的输出。maps的数量通常依赖于输入数据的总长度,也就是,输入文档的总block数。每个节点map的正常并行度应该在10-100之间。 - Reducer
Reduce处理一系列相同key的中间记录。Reducer有3个主要阶段:混洗(Shuffle)、排序(Sort)和reduce。
Shuffle - 输出到Reducer的数据都在Mapper阶段经过排序的。在这个阶段框架将通过HTTP从恰当的Mapper的分区中取得数据。
Sort - 这个阶段框架将对输入到的 Reducer 的数据通过key(不同的 Mapper 可能输出相同的key)进行分组。混洗和排序阶段是同时进行;map的输出数据被获取时会进行合并。
Reduce - 在这个阶段reduce方法将会被调用来处理每个已经分好的组键值对。Reducer 输出的数据是不经过排序的。
合适的 reduce 总数应该在 节点数每个节点的容器数0.95 至 节点数每个节点的容器数1.75 之间。当设定值为0.95时,map任务结束后所有的 reduce 将会立刻启动并且开始转移数据,当设定值为1.75时,处理更多任务的时候将会快速地一轮又一轮地运行 reduce 达到负载均衡。Reduce 的数目的增加将会增加框架的负担,但是会提高负载均衡和降低失败率。当没有 reduction 需求的时候可以将 reduce-task 的数目设置为0,是允许的。 - Partitioner
Partitioner对key进行分区。Partitioner 对 map 输出的中间值的 key(Reducer之前)进行分区。分区采用的默认方法是对 key 取 hashcode。分区数等于 job 的 reduce 任务数。因此这会根据中间值的key 将数据传输到对应的 reduce。HashPartitioner 是默认的的分区器。 - Counter
计数器是一个工具用于报告 Mapreduce 应用的统计。
Mapper 和 Reducer 实现类可使用计数器来报告统计值。
Hadoop Mapreduce 是普遍的可用的 Mappers、Reducers 和 Partitioners 组成的一个库。 - Combiner
在Mapper和Reducer之间有一个非常重要的组件,它就是Combiner。并不是所有的job都适用combiner,只有操作满足结合律的才可设置combiner。每一个map都可能会产生大量的本地输出,Combiner的作用就是对map端的输出先做一次合并,以减少在map和reduce节点之间的数据传输量,以提高网络IO性能,是MapReduce的一种优化手段之一。
4. WordCount
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}