mapreduce是什么?
是一个编程模型, 分为map和reduce. map接受一条record, 将这条record进行各种想要得到的变换输出为中间结果, 而reduce把key相同的中间结果放在一起(key, iterable value list), 进行聚合输出0个或者1个结果.
mapreduce(mr)不是什么
mr不是一个新概念, mr来自函数式编程中已有的概念. Google对mr做出的贡献不在于创造了这个编程模板, 而是把mr整合到分布式的存储和任务管理中去, 实现分布式的计算. 所以就mr而言,重点并不在这个编程模板上, 而是如何通过分布式去实现mr的. 这是我接下来要关注的重点.
一个mr过程的overview:
通过分割[1], 输入数据变成一个有M个split的子集(每一个split从16M到64M不等[2]). map函数被分布到多台服务器上去执行map任务. 使得输入的split能够在不同的机器上被并行处理.
map函数的输出通过用split函数来分割中间key, 来形成R个partition(例如, hash(key) mod R), 然后reduce调用被分布到多态机器上去. partition的数据和分割函数由用户来指定.
一个mr的完整过程:
1> mr的库首先分割输入文件成M个片, 然后再集群中开始大量的copy程序
2> 这些copy中有一个特殊的: 是master. 其它的都是worker. 有M个map任务和R个reduce任务将被分配. mater会把一个map任务或者是一个reduce任务分配给idle worker(空闲机器).
3> 一个被分配了map任务的worker读取相关输入split的内容. 它从输入数据中分析出key/value pair, 然后把key/value对传递给用户自定义的map函数, 有map函数产生的中间key/value pair被缓存在内存中
4> 缓存到内存的中kv paoir会被周期性的写入本地磁盘上. 怎么写? 通过partitioning function把他们写入R个分区. 这些buffered pair在本地磁盘的位置会被传回给master. master会在后面把这个位置转发给reduce的worker.
5> 当reduce的worker接收到master发来的位置信息后, 它通过远程访问来读map worker溢写到磁盘上的数据. 当reduce worker把所有的中间结果都读完了以后, 它要根据中间结果的key做一个sort --> 这样的话, key相同的record会被group到一起. 这个sort是必须的, 因为通常相同的reduce task会收到很多不同的key(如果不做sort, 就没法把key相同的record group在一起了). 如果中间结果太大超过了内存容量, 需要做一个外部的sort.
6> reducer worker会对每一个unique key进行一次遍历, 把每一个unique key和它corresponding的value list传送给用户定义的reduce function中去. reduce的输出被append到这个reduce的partition的最终的输出文件中去
7> 当所有的map任务和reduce任务都完成后, master结点会唤醒user program. 这个时候, 在user program中的对mapreduce的call会返回到用户的code中去.
最终, mr执行的输出会被分到R个输出文件中去(每个reduce输出一个partition, 共R个.) 通常来讲, 用户不需要把这R个输出文件合并成一个, 因为他们经常会被作为下一个mapreduce程序的输入. 或者是通过别的程序来调用他们, 这个程序必须可以handle有多个partition作为输入的情况.
master的数据结构:
master维护的主要是metadata. 它为每一个map和reduce任务存储他们的状态(idle, in-progress,
or completed).
master就像一个管道,通过它,中间文件区域的位置从map任务传递到reduce任务.因此,对于每个完成的map任务,master存储由map任务产生的R个中间文件区域的大小和位置.当map任务完成的时候,位置和大小的更新信息被接受.这些信息被逐步增加的传递给那些正在工作的reduce任务.
Fault Tolerance
错误分为2中 worker的故障和master的故障.
worker故障:
master会周期性的ping每个worker. 如果在一个缺点的时间段内没有收到worker返回的信息, master会把这个worker标记成失效. 失败的任务是如何重做的呢? 每一个worker完成的map任务会被reset为idle的状态, 所以它可以被安排给其它的worker. 对于一个failed掉的worker上的map任务和reduce任务, 也通同样可以通过这种方式来处理.
master失败:
master只有一个, 它的失败会造成single point failure. 就是说, 如果master失败, 就会终止mr计算. 让用户来检查这个状态, 根据需要重新执行mr操作.
在错误面前的处理机制(类似于exactly once?)
当map当user提供的map和reduce operator是关于输入的确定性的操作, 我们提供的分布式implementation能够提供相同的输出. 什么相同的输出呢? 和一个非容错的顺序执行的程序一样的输出. 是如何做到这一点的?
是依赖于map和reduce任务输出的原子性提交来实现这个性质的. 对所有的task而言, task会把输出写到private temporary files中去. 一个map任务会产生R个这样的临时文件, 一个reduce任务会产生1个这样的临时文件. 当map任务完成的时候, worker会给master发一个信息, 这个信息包含了R个临时文件的name. 如果master收到了一条已经完成的map任务的新的完成信息, master会忽略这个信息.否则的话, master会纪录这R个文件的名字到自己的data structure中去.
当reduce任务完成了, reduce worker会自动把自己输出的临时文件重命名为final output file. 如果相同的在多态机器上执行, 那么在相同的final output file上都会执行重命名. 通过这种方式来保证最终的输出文件只包含被一个reduce task执行过的数据.
存储位置
mr是如果利用网络带宽的?
论文中说, 利用把输入数据(HDFS中)存储在机器的本地磁盘来save网络带宽. HDFS把每个文件分成64MB的block. 然后每个block在别的机器上做replica(一般是3份). 做mr时, master会考虑输入文件的位置信息, 并努力在某个机器上安排一个map任务.什么样的机器? 包含了这个map任务的数据的replica的机器上. 如果失败的话, 则尝试就近安排(比如安排到一个worker machine上, 这个machine和包含input data的machine在同一个network switch上), 这样的话, 想使得大部分输入数据在本地读取, 不消耗网络带宽.
任务粒度
把map的输入拆成了M个partition, 把reduce的输入拆分成R个partition. 因为R通常是用户指定的,所以我们设定M的值. 让每一个partition都在16-64MB(对应于HDFS的存储策略, 每一个block是64MB) 另外, 经常把R的值设置成worker数量的小的倍数.
备用任务
straggler(落伍者): 一个mr的总的执行时间总是由落伍者决定的. 导致一台machine 慢的原因有很多:可能硬盘出了问题, 可能是key的分配出了问题等等. 这里通过一个通用的用的机制来处理这个情况:
当一个MapReduce操作接近完成的时候,master调度备用(backup)任务进程来执行剩下的、处于处理中状态(in-progress)的任务。无论是最初的执行进程、还是备用(backup)任务进程完成了任务,我们都把这个任务标记成为已经完成。我们调优了这个机制,通常只会占用比正常操作多几个百分点的计算资源。我们发现采用这样的机制对于减少超大MapReduce操作的总处理时间效果显著。
技巧
partition 函数
map的输出会划分到R个partition中去. 默认的partition的方法是使用hash进行分区. 然而有时候, hash不能满足我们的需求. 比如: 输出的key的值是URLs, 我们希望每个主机的所有条目保持在同一个partition中, 那么我们就要自己写一个分区函数, 如hash(Hostname(urlkey) mod R)顺序保证
我们确保在给定的partition中, 中间的kv pair的值增量顺序处理的. 这样的顺序保证对每个partition生成一个有序的输出文件.Combiner函数
在某些情况下,Map函数产生的中间key值的重复数据会占很大的比重. 如果把这些重复的keybu'zu我们允许用户指定一个可选的combiner函数,combiner函数首先在本地将这些记录进行一次合并,然后将合并的结果再通过网络发送出去。
Combiner函数在每台执行Map任务的机器上都会被执行一次。因此combiner是map侧的一个reduce. 一般情况下,Combiner和Reduce函数是一样的。Combiner函数和Reduce函数之间唯一的区别是MapReduce库怎样控制函数的输出。Reduce函数的输出被保存在最终的输出文件里,而Combiner函数的输出被写到中间文件里,然后被发送给Reduce任务。输入输出类型
支持多种. 比如文本的话, key是offset, value是这一行的内容. 每种输入类型的竖线都必须能够把输入数据分割成split. 这个split能够由单独的map任务来进行后续处理. 使用者可以通过提供一个reader接口的实现来支持新的输入类型. 而且reader不一定需要从文件中读取数据.跳过损耗的纪录
有时候, 用户程序中的bug导致map或者reduce在处理某些record的时候crash掉. 我们提供一种忽略这些record的模式, mr会检测检测哪些记录导致确定性的crash,并且跳过这些记录不处理。
具体做法是: 在执行MR操作之前, MR库会通过全局变量保存record的sequence number, 如果用户程序出发了一个系统信号, 消息处理函数将用"最后一口气" 通过UDP包向master发送处理的最后一条纪录的序号. 当master看到在处理某条特定的record不止失败一次时, 就对它进行标记需要被跳过, 在下次重新执行相关的mr任务的时候跳过这条纪录.
在Google给的例子中, 有一点值得注意.
通过benchmark的测试, 能知道key的分区情况. 而通常对于需要排序的程序来说, 会增加一个预处理的mapreduce操作用于采样key值的分布情况. 通过采样的数据来计算对最终排序处理的分区点.
当时最成功的应用: 重写了Google网络搜索服务所使用到的index系统
总结: mr的牛逼之处在于:
1> MapReduce封装了并行处理、容错处理、数据本地化优化、负载均衡等等技术难点的细节,这使得MapReduce库易于使用。
2> 编程模板好. 大量不同类型的问题都可以通过MapReduce简单的解决。
3> 部署方便.
总结的经验:
1> 约束编程模式使得并行和分布式计算非常容易,也易于构造容错的计算环境(暂时不懂)
2> 网络带宽是稀有资源, 大量的系统优化是针对减少网络传输量为目的的: 本地优化策略使得大量的数据从本地磁盘读取, 中间文件写入本地磁盘, 并且只写一份中间文件.
3> 多次执行相同的任务可以减少性能缓慢的机器带来的负面影响,同时解决了由于机器失效导致的数据丢失问题。
关于shuffle, combiner 和partition
shuffle: 从map写出开始到reduce执行之前的过程可以统一称为shuffle. 具体可以分为map端的shuffle和reduce端的shuffle.
combiner和partition: 都是在map端的.
具体过程:
- Collect阶段
1> 在map()端, 最后一步通过context.write(key,value)输出map处理的中间结果. 然后调用partitioner.getPartiton(key, value, numPartitions)来取得这条record的分区号. record 从kv pair(k,v) -->变为 (k,v,partition).
2> 将变换后的record暂时保存在内存中的MapOutputBuffer内部的环形数据缓冲区(默认大小是100MB, 可以通过参数io.sort.mb调整, 设置这个缓存是为了排序速度提高, 减少IO开销). 当缓冲区的数据使用率到达一定阈值后, 触发一次spill操作. 将环形缓冲区的部分数据写到磁盘上, 生成一个临时的linux本地数据的spill文件, 在缓冲区的使用率再次达到阈值后, 再次生成一个spill文件. 直到数据处理完毕, 在磁盘上会生成很多临时文件.
关于缓冲区的结构先不讨论
2.spill阶段
当缓冲区的使用率到达一定阈值后(默认是80%, 为什么要设置比例, 因为要让写和读同时进行), 出发一次"spill", 将一部分缓冲区的数据写到本地磁盘(而不是HDFS).
特别注意: 在将数据写入磁盘前, 会对这一部分数据进行sort. 默认是使用QuickSort.先按(key,value,partition)中的partition分区号排序,然后再按key排序. 如果设置了对中间数据做压缩的配置还会做压缩操作.
注:当达到溢出条件后,比如默认的是0.8,则会读出80M的数据,根据之前的分区元数据,按照分区号进行排序,这样就可实现同一分区的数据都在一起,然后再根据map输出的key进行排序。最后实现溢出的文件内是分区的,且分区内是有序的
3.Merge阶段
map输出数据比较多的时候,会生成多个溢出文件,任务完成的最后一件事情就是把这些文件合并为一个大文件。合并的过程中一定会做merge操作,可能会做combine操作。
merge与combine的对比:
在map侧可能有2次combine. 在spill出去之前, 会combine一次(在user设置的前提下). 如果map的溢写文件个数大于3时(可配置:min.num.spills.for.combine)在merge的过程中(多个spill文件合并为一个大文件)中还会执行combine操作.
Combine: a:1,a:2 ---> a:3
Merge: a:1,a:2 ---> a,[1,2]
Reducer端: copy, sort, reduce
4.copy
copy的过程是指reduce尝试从完成的map中copy该reduce对应的partition的部分数据.
什么时候开始做copy呢? 等job的第一个map结束后就开始copy的过程了.因为对每一个map,都根据你reduce的数将map的输出结果分成R个partition. 所以map的中间结果中是有可能包含每一个reduce需要处理的部分数据的. 由于每一个map产生的中间结果都有可能包含某个reduce所在的partition的数据, 所以这个copy是从多个map并行copy的(默认是5个).
注: 这里因为网络问题down失败了怎么办? 重试, 在一定时间后若仍然失败, 那么下载现成就会放弃这次下载, 随后尝试从别的地方下载.
5.merge
Reduce将map结果下载到本地时,同样也是需要进行merge的所以io.sort.factor的配置选项同样会影响reduce进行merge时的行为.
当发现reduce在shuffle阶段iowait非常的高的时候,就有可能通过调大这个参数来加大一次merge时的并发吞吐,优化reduce效率。
(copy到哪儿, 先是内存的buffer, 然后是disk) reduce在shuffle阶段对下载下来的map数据也不是立刻写入磁盘, 而是先用一个buffer存在内存中. 然后当使用内存达到一定量的时候才spill到磁盘. 这个百分比是通过另一个参数来控制.
reduce端的merge不是等所有溢写完成后再merge的. 而是一边copy一边sort一边merge. 在执行完merge sort后, reduce task会将数据交给reduce()方法进行处理
参考: