MapReduce原理和执行过程

MapReduce是一个用于处理海量数据的分布式计算框架。

  • 这个框架解决了 • 数据分布式存储
    •作业调度、
    • 容错、
    • 机器间通信等复杂问题

MapReduce的核心思想,分而治之

分:map
• 把复杂的问题分解为若干“简单的 任务”
合:reduce

上面这幅图就是mapreduce的工作原理

以词频统计为例。

词频统计就是统计一个单词在所有文本中出现的次数,在Hadoop中的事例程序就是wordcount,俗称hadoop编程的"hello world".因为我们有多个文本,所以可以并行的统计每个文本中单词出现的个数,然后最后进行合计。
所以这个可以很好地体现map,reduce的过程。

1)首先文档的数据记录(如文本中的行,或数据表格中的行)是以“键值对”的形式传入map 函数,然后map函数对这些键值对进行处理(如统计词频),然后输出到中间结果。

2)在键值对进入reduce进行处理之前,必须等到所有的map函数都做完,所以既为了达到这种同步又提高运行效率,在mapreduce中间的过程引入了barrier(同步障)
在负责同步的同时完成对map的中间结果的统计,包括 a. 对同一个map节点的相同key的value值进行合并,b. 之后将来自不同map的具有相同的key的键值对送到同一个reduce进行处理。

3)在reduce阶段,每个reduce节点得到的是从所有map节点传过来的具有相同的key的键值对。reduce节点对这些键值进行合并。

4)Combiner 节点负责完成上面提到的将同一个map中相同的key进行合并,避免重复传输,从而减少传输中的通信开销。

5)Partitioner节点负责将map产生的中间结果进行划分,确保相同的key到达同一个reduce节点.

编程模型

• 借鉴函数式的编程方式
• 用户只需要实现两个函数接口:
• Map(in_key, in_value)-> (out_key, intermediate_value) list
• Reduce (out_key, intermediate_value list) ->out_value list

两个重要的进程

  • JobTracker
    主进程,负责接收客户作业提交,调度任务到作节点上运行,并提供诸如监控工作节点状态及任务进度等 管理功能,一个MapReduce集群有一个jobtracker,一般运行在可靠的硬件上。
    • tasktracker是通过周期性的心跳来通知jobtracker其当前的健康状态,每一次心跳包含了可用的map和 reduce任务数目、占用的数目以及运行中的任务详细信息。Jobtracker利用一个线程池来同时处理心跳和 客户请求。
  • TaskTracker
    • 由jobtracker指派任务,实例化用户程序,在本地执行任务并周期性地向jobtracker汇报 状态。在每一个工 作节点上永远只会有一个tasktracker

• JobTracker一直在等待JobClient提交作业
• TaskTracker每隔3秒向JobTracker发送心跳询问有没有任务可做,如果有,让
其派发任务给它执行
• Slave主动向master拉生意

wordCount实例代码(python实现)

案例文件结构

run.sh文件
 HADOOP_CMD="/usr/local/src/hadoop-1.2.1/bin/hadoop"  
 STREAM_JAR_PATH="/usr/local/src/hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar"    //python代码实现,引入streaming

#INPUT_FILE_PATH_1="/The_Man_of_Property.txt"
INPUT_FILE_PATH_1="/1.data"
OUTPUT_PATH="/output"

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH

# Step 1. 
$HADOOP_CMD jar $STREAM_JAR_PATH \    //设置通过streaming方式提交
    -input $INPUT_FILE_PATH_1 \    //上面定义的,指向/1.data,原始文件(数据源)
    -output $OUTPUT_PATH \     //输出路径
    -mapper "python map_new.py" \    //指定如何执行map
    -reducer "python red_new.py" \    //指定如何执行reducer
    -file ./map_new.py \    //通过下面两个配置文件,把本地的代码分发到集群的map和Reducer上去
    -file ./red_new.py
a.txt
111
222
333
map_new.py
import sys

for line in sys.stdin:
    ss = line.strip().split(' ')
    for word in ss:
            print '\t'.join([word.strip(), '1'])

cat a.txt | python map_new.py。通过管道的方式,a.txt 是数据源,标准输入到map。

red_new.py
import sys


cur_word = None
sum = 0

for line in sys.stdin:
    ss = line.strip().split('\t')
    if len(ss) != 2:
            continue
    word, cnt = ss

    if cur_word == None:
            cur_word = word

    if cur_word != word:
            print '\t'.join([cur_word, str(sum)])
            cur_word = word
            sum = 0

    sum += int(cnt)

print '\t'.join([cur_word, str(sum)])

head -2 The_Man_of_Property.txt | python map_new.py | sort -k1 | python red_new.py > result.local
这段代码模拟了数据进入map,中间sort之后再进入red,最后输出result.local

启动集群。进入hadoop/bin 目录下
./start-all.sh

然后hadoop fs -out 1.data / 把数据源上传到HDFS
bash run.sh执行streaming方式提交的脚本。

MapReduce计算框架-执行流程

File:

文件要存储在HDFS中,每个文件切分成多个一定大小(默认64M)的Block(默认3个备份)存储在多个节点(DataNode)上

文件数据内容:

We are studying at badou.\n
We are studyPinagrtiatitonbeardou.\n
......

InputFormat:

MR框架基础类之一
• 数据分割(Data Splits)
• S记pli录t读取Sp器lit(RSepcliotrdReader)
例子:
数据格定义,如果以“\n”分割每条记录,以空格区分一个目标单词
Shuffle “we are studying at badou.”为一条记录
“are”“at”等为一个目标单词

Split:

实际上每个split包含后一个Block中开头部分的数据(解决记录跨Block问题)
例子:
比如记录 “we are studing at badou./n"
跨越存储在两个Block中,那么这条记录属于前一个Block对应的split

RecordReader:(RR)

每读取一条记录,调用一次map函数
例子
比如,记录“we are studying at badou." 作为参数v,调用map(v)
然后继续这个过程,读取议案一条记录知道split尾部。

Map:

比如记录”we are studying at badou"
调用执行一次map("we are studying at badou")
在内存中增加数据:
{"we":1}
{"are":1}
......

Shuffle:

Partion,Sort,Spill,Meger,Combiner.......
神奇发生的地方,性能优化大有可为的地方!

Partitioner:

决定数据由哪个Reducer处理,从而分区
比如采用Hash法。

MemoryBuffer

内存缓冲区,每个map的结果和partition处理的key value结果都保存在缓存中

缓冲区大小:默认100M
溢写阈值:100M*0.8 = 80M

缓冲区中的数据:partition key value 三元组数据
{“1”, “are” : 1}
{“2”, “at” : 1}
{“1”, “we” : 1}

Spill:

内存缓冲区达到阈值时,溢写spill线程锁住这80M 的缓冲区,开始将数据写出到本地磁盘中,然后释 放内存。
每次溢写都生成一个数据文件。 溢出的数据到磁盘前会对数据进行key排序sort, 以及合并combiner
发送相同Reduce的key数量,会拼接到一起,减少 partition的索引数量。

Sort:

缓冲区数据按照key进行排序

Combiner:

数据合并,相同的key的数据,value值合并,减少输 出传输量 Combiner函数事实上是reducer函数,满足 combiner处理不影响{sum,max等}最终reduce的 结果时,可以极大提升性能
{“1”, “are”, 1} {“1”, “are”, 1} {“1”, “we”, 1}==》
{“1”, “are”, 2} {“1”, “we”, 1}

Reducer

多个reduce任务输入的数据都属于不同的partition,因此结果数据的key不会重复。
合并reduce的输出文件即可得到最终的结果。

MapReduce物理配置

• 文件句柄个数 – ulimit
• cpu

– 多核

• 内存

– 8G以上

• 合适的slot

– 单机map、reduce个数
– mapred.tasktracker.map.tasks.maximum(默认2)
– mapreduce.tasktracker.tasks.reduce.maximum(默认2) – 内存限制
– cpu核数-1
– 多机集群分离

• 磁盘情况

– 合适单机多磁盘
– mapred.local.dir和dfs.data.dir

• 确定map任务数时依次优先参考如下几个原则:

– 每个map任务使用的内存不超过800M,尽量在500M以下
– 每个map任务运行时间控制在大约20分钟,最好1-3分钟
– 每个map任务处理的最大数据量为一个HDFS块大小,一个map任务处理的输入不能跨文件
– map任务总数不超过平台可用的任务槽位

• 配置加载的问题

– 简单配置通过提交作业时-file分发
– 复杂较大配置
• 传入hdfs
• map中打开文件读取
• 建立内存结构

• map个数为split的份数
• 压缩文件不可切分
• 非压缩文件和sequence文件可以切分
• dfs.block.size决定block大小
• 确定reduce任务数时依次优先参考如下几个方面:

– 每个reduce任务使用的内存不超过800M,尽量在500M以下
– 每个reduce任务运行时间控制在大约20分钟,最好1-3分钟 – 整个reduce阶段的输入数据总量
– 每个reduce任务处理的数据量控制在500MB以内
– map任务数与reduce任务数的乘积
– 输出数据要求

• reduce个数设置

– mapred.reduce.tasks – 默认为1

• reduce个数太少 – 单次执行慢

– 出错再试成本高

• reduce个数太多 – shuffle开销大

– 输出大量小文件

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,590评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,808评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,151评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,779评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,773评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,656评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,022评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,678评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,038评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,756评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,411评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,005评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,973评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,053评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,495评论 2 343

推荐阅读更多精彩内容