MapReduce

参考链接:https://github.com/wangzhiwubigdata/God-Of-BigData/blob/master/%E5%A4%A7%E6%95%B0%E6%8D%AE%E6%A1%86%E6%9E%B6%E5%AD%A6%E4%B9%A0/Hadoop-MapReduce.md

MapReduce 概述

分布式计算框架
A MapReduce job consists of a number of
– Map tasks(Map task performs data transformation)
– Reduce tasks(Reduce task combines results of map tasks)
– (Internally) shuffle tasks (Shuffle task sends output of map tasks to right reduce tasks)

比较简单的流程(后续有详解):
input data -> blocks in hdfs -> map function in parallel -> reduce -> output

MapReduce 1.x 流程(MapReduce最原始的处理流程,2.x之后使用YARN)

hadoop cluster.png

JobTracker: ===>集群资源管理与作业调度 + 与client进行通信

  • Only one job tracker in a Hadoop cluster
  • Takes requests from clients (MapReduce programs)
  • Ask name node for location of data
  • Assign tasks to task trackers near the data
  • Reassign tasks if failed

TaskTracker ===>汇报心跳,一个是执行命令

  • Accept (map, reduce, shuffle) tasks from job trackers
  • Send heartbeats to job trackers: I am alive
  • Monitor status of tasks and notify job tracker
JobTracker & TaskTracker.png

从 Map function和Recuce function 说起

python 中map和reduce的使用

https://www.runoob.com/python/python-func-map.html

map函数: map(function, iterable, ...)
>>> map(lambda x: x ** 2, [1, 2, 3, 4, 5])  
[1, 4, 9, 16, 25]
# 提供了两个列表,对相同位置的列表数据进行相加
>>> map(lambda x, y: x + y, [1, 3, 5, 7, 9], [2, 4, 6, 8, 10])
[3, 7, 11, 15, 19]

reduce 函数 reduce(function, iterable[, initializer])
from functools import reduce
>>> reduce(lambda x, y: x+y, [1,2,3,4,5])  
15

MapReduce 中map和reduce的使用

  • map和reduce的输入输出均为<k,v>键值对
  • map的输出作为reduce的输入(map输出其实还有一步group by的过程)
  • System groups the intermediate key‐value pairs from map tasks by key
    E.g., <hello, 1> <hello, 1> <hello, 1> <this, 1> => <hello, [1, 1, 1]>, <this, [1]>
MapReduce diagram.png

MapReduce in parallel.png

In hadoop

  • A node may run multiple map/reduce tasks
  • Typically, one map task per input split (chunk of data)
  • One reduce task per partition of map output E.g., partition by key range or hashing

wordcount 例子:

word.txt
hello
hello world

Map function:

  • Input: <offset of line, line> // line = a line of text in a document =>{0:"hello",6:"hello world"}
  • Output: for each word in line, output <word, 1> => ["hello":1,"hello":1,"world":1]

Reduce function:
– Input: <word, list of 1’s> => {"hello":[1,1],"world":[1]} 所以其实reduce的输入在某种程度上来说不是map的输出
– Output: <word, count> where count is the number of 1's in the input list =>
{"hello":2,"world":1}

Map 和 Reduce 中的Java实现

对于MapReduce框架,其更像一套八股文,我们只需要编写Map和Reduce函数即可,其他的细节框架已经帮我们实现了,在Java中通过Mapper和Reducer类封装,所以我们可以编写子类继承父类然后overide相应的方法即可

Each map task runs an instance of Mapper
– Mapper has a map function
– Map task invokes the map function of the Mapper once for each input key‐value pair

Mapper.png

Each reduce task runs an instance of Reducer
– Reducer has a reduce function
– Reduce task invokes the reduce function of the Reducer once for every different intermediate key
– For the reduce function, values are NOT in any particular order

Reducer.png

Map 之后, Reduce之前的细节 --- Shuffling

Shuffle

  • Process of distributing intermediate key‐values to the right reduce tasks
  • It is the only communication among map and reduce tasks
    • Individual map tasks do not exchange data directly with other map tasks
    • They are not even aware of existence of their peer

Map可能在不同的机器上并行处理的,需要通过 shuffling 将相同 key 值的数据分发到同一个节点上去合并,这样才能统计出最终的结果,并且这一步也将一个由Map生成的<k,v> 变成<k,[v1,v2]>传给后续的Reduce来操作


Shuffle.png

图片来源:
https://xinze.fun/2020/01/10/Spark-Shuffle-%E5%92%8C-Spill-%E7%9A%84%E5%8C%BA%E5%88%AB/

Internal of shuffling
1.Map side:Partition, sort, spill & merge

  • Partition data in the buffer into R parts (R = # of reduce tasks) # partition = # reduce task
  • Sort data in each partition by key
  • Spill/write data in the buffer to disk
  • Merge the spills
  • Notify job tracker: output complete

2.Reduce side: Fetch & merge

  • Task tracker notified by job tracker: data ready
  • Fetch/copy data from map side
  • Merge data( Some data may sit on disk once fetched , this depends on the buffer size)
  • Figure out groups from sorted data
details about Shuffle.png

回到开头 数据的输入输出格式

mapreduce with 2 nodes.png

输入

InputFormat

  • Determine how input files are split and read
  • Defined in the Java interface InputFormat
  • Job:
    • Split input file into chunks called InputSplits(logic concepts)
    • Implement RecordReader to read data from splits

在Java中InputFormat是个抽象类,其有诸多的子类去读取特定的输入
• FileInputFormat (input from files in given dirs) --- 用的最多
• DBInputFormat (input data from a database)
• CombineFileInputFormat (input data by combining multiple files)

FileInputFormat
– Takes paths to files
– Read all files in the paths
Divide each file into one or more InputSplits
FileInputFormat也是一个抽象类,其下有对应的子类来实现对应的内容输入
– TextInputFormat
– KeyValueTextInputFormat
– SequenceFileInputFormat

Subclasses of FileInputFormat.png

InputFormat 干的第一件事情:Split input file into InputSplits

聊一聊InputSplit,注意一点,InputSplit是一个逻辑划分的概念
• If a file is big, multiple splits may be created Typical split size = 128MB
• A map task is created for each split (a chunk of some input file)

InputFormat 干的第二件事情:Implement RecordReader to read data from InputSplits

  • InputFormat defines an instanceof RR E.g., TextInputFormat provides LineRecordReader
  • LineRecordReader
    • Form a key‐value pair for every line of file
    • Data type for key: LongWritable; value: Text
  • Reader is repeatedly called until all data in the split are processed

输出

OutputFormat

  • Define the format of output from Reducers(Output stored in a file)
  • Defined in the Java interface OutputFormat
  • Implemention: FileOutputFormat
    • Subclasses: TextOutputFormat, SequenceFileOutputFormat


      Subclasses of OutputFormat.png
  • All Reducers write to the same directory
  • Set by FileOutputFormat.setOutputPath() method
  • OutputFormat defines aRecordWrite which handles the write

可选操作:Combiner

个人理解:combiner 约等于 一个本地化的reduer

  • Run on the node running the Mapper
    • Perform local (or mini‐) reduction
  • Combine Mapper results
    • Before they are sent to the Reducers
    • Reduce communication costs
    • E.g., may use a combiner in WordCount – (cat, 1), (cat, 1), (cat, 1) => (cat, 3)

注意,使用combiner是为了节省空间成本、加快计算,但是不能对最终的结果有影响,例如说求个平均数什么的就不能用combiner
专业的说法就是:
May directly use the combiner

  • If it is commutative and associative
  • Meaning operations can be grouped & performed in any order
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,732评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,496评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,264评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,807评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,806评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,675评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,029评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,683评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,704评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,666评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,773评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,413评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,016评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,204评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,083评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,503评论 2 343

推荐阅读更多精彩内容