MapReduce是一个用于处理海量数据的分布式计算框架。
- 这个框架解决了 • 数据分布式存储
•作业调度、
• 容错、
• 机器间通信等复杂问题
MapReduce的核心思想,分而治之
分:map
• 把复杂的问题分解为若干“简单的 任务”
合:reduce
上面这幅图就是mapreduce的工作原理
以词频统计为例。
词频统计就是统计一个单词在所有文本中出现的次数,在Hadoop中的事例程序就是wordcount,俗称hadoop编程的"hello world".因为我们有多个文本,所以可以并行的统计每个文本中单词出现的个数,然后最后进行合计。
所以这个可以很好地体现map,reduce的过程。
1)首先文档的数据记录(如文本中的行,或数据表格中的行)是以“键值对”的形式传入map 函数,然后map函数对这些键值对进行处理(如统计词频),然后输出到中间结果。
2)在键值对进入reduce进行处理之前,必须等到所有的map函数都做完,所以既为了达到这种同步又提高运行效率,在mapreduce中间的过程引入了barrier(同步障)
在负责同步的同时完成对map的中间结果的统计,包括 a. 对同一个map节点的相同key的value值进行合并,b. 之后将来自不同map的具有相同的key的键值对送到同一个reduce进行处理。
3)在reduce阶段,每个reduce节点得到的是从所有map节点传过来的具有相同的key的键值对。reduce节点对这些键值进行合并。
4)Combiner 节点负责完成上面提到的将同一个map中相同的key进行合并,避免重复传输,从而减少传输中的通信开销。
5)Partitioner节点负责将map产生的中间结果进行划分,确保相同的key到达同一个reduce节点.
编程模型
• 借鉴函数式的编程方式
• 用户只需要实现两个函数接口:
• Map(in_key, in_value)-> (out_key, intermediate_value) list
• Reduce (out_key, intermediate_value list) ->out_value list
两个重要的进程
-
JobTracker
主进程,负责接收客户作业提交,调度任务到作节点上运行,并提供诸如监控工作节点状态及任务进度等 管理功能,一个MapReduce集群有一个jobtracker,一般运行在可靠的硬件上。
• tasktracker是通过周期性的心跳来通知jobtracker其当前的健康状态,每一次心跳包含了可用的map和 reduce任务数目、占用的数目以及运行中的任务详细信息。Jobtracker利用一个线程池来同时处理心跳和 客户请求。 -
TaskTracker
• 由jobtracker指派任务,实例化用户程序,在本地执行任务并周期性地向jobtracker汇报 状态。在每一个工 作节点上永远只会有一个tasktracker
• JobTracker一直在等待JobClient提交作业
• TaskTracker每隔3秒向JobTracker发送心跳询问有没有任务可做,如果有,让
其派发任务给它执行
• Slave主动向master拉生意
wordCount实例代码(python实现)
案例文件结构
run.sh文件
HADOOP_CMD="/usr/local/src/hadoop-1.2.1/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar" //python代码实现,引入streaming
#INPUT_FILE_PATH_1="/The_Man_of_Property.txt"
INPUT_FILE_PATH_1="/1.data"
OUTPUT_PATH="/output"
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH
# Step 1.
$HADOOP_CMD jar $STREAM_JAR_PATH \ //设置通过streaming方式提交
-input $INPUT_FILE_PATH_1 \ //上面定义的,指向/1.data,原始文件(数据源)
-output $OUTPUT_PATH \ //输出路径
-mapper "python map_new.py" \ //指定如何执行map
-reducer "python red_new.py" \ //指定如何执行reducer
-file ./map_new.py \ //通过下面两个配置文件,把本地的代码分发到集群的map和Reducer上去
-file ./red_new.py
a.txt
111
222
333
map_new.py
import sys
for line in sys.stdin:
ss = line.strip().split(' ')
for word in ss:
print '\t'.join([word.strip(), '1'])
cat a.txt | python map_new.py。通过管道的方式,a.txt 是数据源,标准输入到map。
red_new.py
import sys
cur_word = None
sum = 0
for line in sys.stdin:
ss = line.strip().split('\t')
if len(ss) != 2:
continue
word, cnt = ss
if cur_word == None:
cur_word = word
if cur_word != word:
print '\t'.join([cur_word, str(sum)])
cur_word = word
sum = 0
sum += int(cnt)
print '\t'.join([cur_word, str(sum)])
head -2 The_Man_of_Property.txt | python map_new.py | sort -k1 | python red_new.py > result.local
这段代码模拟了数据进入map,中间sort之后再进入red,最后输出result.local
启动集群。进入hadoop/bin 目录下
./start-all.sh
然后hadoop fs -out 1.data / 把数据源上传到HDFS
bash run.sh执行streaming方式提交的脚本。
MapReduce计算框架-执行流程
File:
文件要存储在HDFS中,每个文件切分成多个一定大小(默认64M)的Block(默认3个备份)存储在多个节点(DataNode)上
文件数据内容:
We are studying at badou.\n
We are studyPinagrtiatitonbeardou.\n
......
InputFormat:
MR框架基础类之一
• 数据分割(Data Splits)
• S记pli录t读取Sp器lit(RSepcliotrdReader)
例子:
数据格定义,如果以“\n”分割每条记录,以空格区分一个目标单词
Shuffle “we are studying at badou.”为一条记录
“are”“at”等为一个目标单词
Split:
实际上每个split包含后一个Block中开头部分的数据(解决记录跨Block问题)
例子:
比如记录 “we are studing at badou./n"
跨越存储在两个Block中,那么这条记录属于前一个Block对应的split
RecordReader:(RR)
每读取一条记录,调用一次map函数
例子
比如,记录“we are studying at badou." 作为参数v,调用map(v)
然后继续这个过程,读取议案一条记录知道split尾部。
Map:
比如记录”we are studying at badou"
调用执行一次map("we are studying at badou")
在内存中增加数据:
{"we":1}
{"are":1}
......
Shuffle:
Partion,Sort,Spill,Meger,Combiner.......
神奇发生的地方,性能优化大有可为的地方!
Partitioner:
决定数据由哪个Reducer处理,从而分区
比如采用Hash法。
MemoryBuffer
内存缓冲区,每个map的结果和partition处理的key value结果都保存在缓存中
缓冲区大小:默认100M
溢写阈值:100M*0.8 = 80M
缓冲区中的数据:partition key value 三元组数据
{“1”, “are” : 1}
{“2”, “at” : 1}
{“1”, “we” : 1}
Spill:
内存缓冲区达到阈值时,溢写spill线程锁住这80M 的缓冲区,开始将数据写出到本地磁盘中,然后释 放内存。
每次溢写都生成一个数据文件。 溢出的数据到磁盘前会对数据进行key排序sort, 以及合并combiner
发送相同Reduce的key数量,会拼接到一起,减少 partition的索引数量。
Sort:
缓冲区数据按照key进行排序
Combiner:
数据合并,相同的key的数据,value值合并,减少输 出传输量 Combiner函数事实上是reducer函数,满足 combiner处理不影响{sum,max等}最终reduce的 结果时,可以极大提升性能
{“1”, “are”, 1} {“1”, “are”, 1} {“1”, “we”, 1}==》
{“1”, “are”, 2} {“1”, “we”, 1}
Reducer
多个reduce任务输入的数据都属于不同的partition,因此结果数据的key不会重复。
合并reduce的输出文件即可得到最终的结果。
MapReduce物理配置
• 文件句柄个数 – ulimit
• cpu
– 多核
• 内存
– 8G以上
• 合适的slot
– 单机map、reduce个数
– mapred.tasktracker.map.tasks.maximum(默认2)
– mapreduce.tasktracker.tasks.reduce.maximum(默认2) – 内存限制
– cpu核数-1
– 多机集群分离
• 磁盘情况
– 合适单机多磁盘
– mapred.local.dir和dfs.data.dir
• 确定map任务数时依次优先参考如下几个原则:
– 每个map任务使用的内存不超过800M,尽量在500M以下
– 每个map任务运行时间控制在大约20分钟,最好1-3分钟
– 每个map任务处理的最大数据量为一个HDFS块大小,一个map任务处理的输入不能跨文件
– map任务总数不超过平台可用的任务槽位
• 配置加载的问题
– 简单配置通过提交作业时-file分发
– 复杂较大配置
• 传入hdfs
• map中打开文件读取
• 建立内存结构
• map个数为split的份数
• 压缩文件不可切分
• 非压缩文件和sequence文件可以切分
• dfs.block.size决定block大小
• 确定reduce任务数时依次优先参考如下几个方面:
– 每个reduce任务使用的内存不超过800M,尽量在500M以下
– 每个reduce任务运行时间控制在大约20分钟,最好1-3分钟 – 整个reduce阶段的输入数据总量
– 每个reduce任务处理的数据量控制在500MB以内
– map任务数与reduce任务数的乘积
– 输出数据要求
• reduce个数设置
– mapred.reduce.tasks – 默认为1
• reduce个数太少 – 单次执行慢
– 出错再试成本高
• reduce个数太多 – shuffle开销大
– 输出大量小文件