MapReduce体系结构指南

1. 简介

Hadoop Mapreduce是一个易于编程并且能在大型集群(上千节点)快速地并行得处理大量数据的软件框架,以可靠,容错的方式部署在商用机器上。

MapReduce作业通常将输入数据集分成独立的块,由map任务以完全平行的方式进行处理。框架对map的输出进行排序,然后输入到reduce任务。 通常,作业的输入和输出都存储在文件系统中。 该框架负责调度任务,监控它们并重新执行失败的任务。

通常,计算节点和存储节点是相同的,即MapReduce框架和Hadoop分布式文件系统在同一组节点上运行。 该配置允许框架在数据已经存在的节点上有效地调度任务,从而在整个集群中产生非常高的聚合带宽。

MapReduce框架由单个主ResourceManager,每个集群节点的一个从属NodeManager和每个应用程序的MRAppMaster组成。

Hadoop 客户端提交Job和配置信息给ResourceManger,它将负责把配置信息分配给从属节点,调度任务并且监控它们,把状态信息和诊断信息传输给客户端。

2. Inputs and Outputs

MapReduce 框架只操作键值对,MapReduce 将job的不同类型输入当做键值对来处理并且生成一组键值对作为输出。

Key和Value类必须通过实现Writable接口来实现序列化。此外,Key类必须实现WritableComparable 来使得排序更简单。
MapReduce作业的输入和输出类型:

(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)

3. Mapper、Reducer、Partitioner、Counter、Combiner

应用通常实现Mapper和Reducer接口提供map和reduce方法。这是Job的核心代码。

  1. Mapper
    Mappers将输入的键值对转换成中间键值对。
    Maps是多个单独执行的任务将输入转换成中间记录。那些被转换的中间记录不一定要和输入的记录为相同类型。输入键值对可以在map后输出0或者更多的键值对。MapReduce 会根据 InputFormat 切分成的各个 InputSplit 都创建一个map任务。所有的中间值都会按照Key进行排序,然后传输给一个特定的Reducer做最后确定的输出。maps的数量通常依赖于输入数据的总长度,也就是,输入文档的总block数。每个节点map的正常并行度应该在10-100之间。
  2. Reducer
    Reduce处理一系列相同key的中间记录。Reducer有3个主要阶段:混洗(Shuffle)、排序(Sort)和reduce。
    Shuffle - 输出到Reducer的数据都在Mapper阶段经过排序的。在这个阶段框架将通过HTTP从恰当的Mapper的分区中取得数据。
    Sort - 这个阶段框架将对输入到的 Reducer 的数据通过key(不同的 Mapper 可能输出相同的key)进行分组。混洗和排序阶段是同时进行;map的输出数据被获取时会进行合并。
    Reduce - 在这个阶段reduce方法将会被调用来处理每个已经分好的组键值对。Reducer 输出的数据是不经过排序的。
    合适的 reduce 总数应该在 节点数每个节点的容器数0.95 至 节点数每个节点的容器数1.75 之间。当设定值为0.95时,map任务结束后所有的 reduce 将会立刻启动并且开始转移数据,当设定值为1.75时,处理更多任务的时候将会快速地一轮又一轮地运行 reduce 达到负载均衡。Reduce 的数目的增加将会增加框架的负担,但是会提高负载均衡和降低失败率。当没有 reduction 需求的时候可以将 reduce-task 的数目设置为0,是允许的。
  3. Partitioner
    Partitioner对key进行分区。Partitioner 对 map 输出的中间值的 key(Reducer之前)进行分区。分区采用的默认方法是对 key 取 hashcode。分区数等于 job 的 reduce 任务数。因此这会根据中间值的key 将数据传输到对应的 reduce。HashPartitioner 是默认的的分区器。
  4. Counter
    计数器是一个工具用于报告 Mapreduce 应用的统计。
    Mapper 和 Reducer 实现类可使用计数器来报告统计值。
    Hadoop Mapreduce 是普遍的可用的 Mappers、Reducers 和 Partitioners 组成的一个库。
  5. Combiner
    在Mapper和Reducer之间有一个非常重要的组件,它就是Combiner。并不是所有的job都适用combiner,只有操作满足结合律的才可设置combiner。每一个map都可能会产生大量的本地输出,Combiner的作用就是对map端的输出先做一次合并,以减少在map和reduce节点之间的数据传输量,以提高网络IO性能,是MapReduce的一种优化手段之一。

4. WordCount

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,732评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,496评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,264评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,807评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,806评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,675评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,029评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,683评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,704评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,666评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,773评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,413评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,016评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,204评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,083评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,503评论 2 343

推荐阅读更多精彩内容