15)MapReduce框架原理

数据切片和MapTask并行度决定机制

1)一个Job的Map阶段并行度由客户端在提交Job时的切片数决定

2)每一个Split切片分配一个MapTask并行实例处理

3)默认情况下,切片大小=BlockSize

4)切片时不考虑数据集整体,而是逐个针对每一个文件单独切片


FileInputFormat切片源码解析

1)程序先找到你数据存储的目录

2)开始遍历处理目录下的每一个文件

3)遍历第一个文件ss.txt

    a)获取文件大小fs.sizeOf(ss.txt)

    b)计算切片大小

computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M

    c)默认情况下切片大小=blocksize块大小

    d)开始切片,ss.txt=300M

第一个切片:ss.txt——0:128M 

第二个切片:ss.txt——128:256M

第三个切片:ss.txt——256:300M

(每次切片时,都要判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划分一块切片)

    e)将切片信息写到一个切片规划文件中

    f)整个切片的核心过程在getSplit()方法中完成

    g)InputSplit只记录了切片的元数据信息,比如起始位置,长度以及所在节点列表等

4)提交切片规划文件到YARN上,YARN上的MrAppMaster就可以根据切片规划文件计算开启MapTask个数


FileInputFormat切片大小的参数配置

1)源码中计算切片大小的公式

Math.max(minSize, Math.min(maxSize, blockSize))

mapreduce.input.fileinputformat.split.minsize=1 默认值为1

mapreduce.input.fileinputformat.split.maxsize=Long.MAXValue 默认值为Long的最大值

2)切片大小设置

maxsize(切片最大值):参数如果调得比blockSize小,则会让切片变小,而且就等于配置的这个参数的值

minsize(切片最小值):参数调得比blockSize大,则可以让切片变得比blockSize还大

3)获取切片信息API

//获取切片的文件名称

String name=inputSplit.getPath().getName();

//根据文件类型获取切片信息

FileSplit inputSplit=(FileSplit)context.getInputSplit();


CombineTextInputFormat切片机制

    框架默认的TextInputFormat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量小文件就会产生大量的MapTask,处理效率极其低下

    CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样多个小文件就可以交给一个MapTask处理

虚拟存储切片最大值设置:

// 如果不设置InputFormat,它默认用的是TextInputFormat.class

job.setInputFormatClass(CombineTextInputFormat.class);

// 虚拟存储切片最大值设置20m = 20971520

CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);    //4m

切片机制:


FileInputFormat实现类

    TextInputFormat    处理文本

    KeyValueTextInputFormat    处理键值对

    NLineInputFormat    按行处理

    CombineTextInputFormat    处理小文件

    自定义InputFormat


TextInputFormat

    是默认的FileInputFormat实现类。按行读取每条记录。键是存储该行在整个文件中的起始字节偏移量,LongWritable类型。值是这行的内容,不包括任何行终止符(换行符和回车符),Text类型。

KeyValueTextInputFormat

    每一行均为一条记录,被分隔符分割为key,value。可以通过在驱动类中设置conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, "\t");来设定分隔符。默认分隔符是tab(\t)。

NLineInputFormat

    如果使用NLineInputFormat,代表每个map进程处理的InputSplit不再按Block块去划分,而是按NlineInputFormat指定的行数N来划分。即输入文件的总行数/N=切片数,如果不整数,切片数=商+1。

自定义InputFormat

    步骤:

        1)自定义一个类继承FileInputFormat

        2)改写RecordReader,实现一次读取一个完整文件封装为KV

        3)在输出时使用SequenceFileOutPutFormat输出合并文件


MapReduce工作流程

    Map阶段

    1)客户端形成任务切片数

    2)客户端提交切片信息(.split) 配置信息(.xml)和.jar

    3)Yarn根据切片数启动相应的MapTask

    4)MapTask中读取客户端提交的数据,默认使用TextInputFormat按行读取,然后在MapTask中进行逻辑运算并写出

    5)MapTask写出到环形缓存区,环形缓存区对数据进行分区排序(字典顺序,快排),环形缓存区默认100M,写到80%时将环形缓存区数据溢出到文件(分区且区内有序),然后在继续回写环形缓存区

    6)对溢出的文件根据分区归并排序

    7)Combiner合并

        案例:

            Combiner合并案例实操

    Reduce阶段

    1)所有MapTask任务完成后,启动相应数量的ReduceTask(根据分区数决定启动几个ReduceTask)

    2)下载MapTask归并排序后的数据到本地磁盘,如果数据小则直接放到缓存中

    3)合并下载的数据归并排序

    4)分组排序

        案例:

            WritableComparable排序案例实操(全排序)

            WritableComparable排序案例实操(区内排序)

            GroupingComparator分组案例实操

    5)将数据按key相同的原则(一次读取一组)传入Reduce方法中

    6)写出到Part-r-0000000等文件


Shuffle机制

    Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle


Partition分区

    默认Partition分区

public class HashPartitioner<K, V> extends Partitioner<K, V> {

    public int getPartition(K key, V value, int numReduceTasks){

        return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;

    }

}


自定义Partitioner分区

需求:根据手机号前三位输出到不同文件中

    1)自定义类继承Partitioner,重写getPartition()方法

public class ProvincePartitioner extends Partitioner<Text, FlowBean> {

    @Override

    public int getPartition(Text key, FlowBean value, int i) {

        // key 是手机号

        // value 流量信息

        // 获取手机号前三位

        String prePhoneNum = key.toString().substring(0, 3);


        int partition = 4;

        if ("136".equals(prePhoneNum)) {

            partition = 0;

        } else if ("137".equals(prePhoneNum)) {

            partition = 1;

        } else if ("138".equals(prePhoneNum)) {

            partition = 2;

        } else if ("139".equals(prePhoneNum)) {

            partition = 3;

        }

        return partition;

    }

}

    2)在Job驱动中,设置自定义Partitioner

job.setPartitionerClass(ProvincePartitioner.class);

    3)自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask

job.setNumReduceTasks(5);


排序

    默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。


排序分类

    1)部分排序

        MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序

    2)全排序

        最终输出结果只有一个文件,且文件每部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。

    3)辅助排序:(GroupingComparator分组)

        在Reduce端对key进行分组。应用于:在接受key为Bean对象时,想让一个或几个字段相同(全部字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序

    4)二次排序

        在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序

案例:

    WritableComparable排序案例实操(全排序)

    WritableComparable排序案例实操(区内排序)

    GroupingComparator分组案例实操


Combiner合并

    1)Combiner是MR程序中Mapper和Reducer之外的一种组件

    2)Combiner组件的父类就是Reducer

    3)Combiner和Reducer的区别在于运行的位置:

        Combiner是在每一个MapTask所在的节点运行;

        Reducer是接收全局所有Mapper的输出结果;

    4)Combiner的意义就是对每一个 MapTask的输出进行局部汇总,以减小网络传输量

    5)Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv应该跟Reducer的输入kv类型要对应起来

案例:

    Combiner合并案例实操


设置ReduceTask并行度(个数)

//驱动类中加入以下代码即可

//默认值是1 手动设置为4

job.setNumReduceTasks(4);

注意:

1)ReduceTask如果为0,表示没有Reduce阶段,输出文件个数和Map个数一致

2)ReduceTask默认值就是1,所以输出文件个数为一个

3)如果数据分布不均匀,就有可能在Reduce阶段产生数据倾斜

4)ReduceTask数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有一个ReduceTask

5)具体多少个ReduceTask,需要根据集群性能而定

6)如果分区数不是1,但是ReduceTask为1,是否执行分区过程?答案是:不执行分区过程。因为在MapTask的源码中,执行分区的前提是先判断ReduceNum个数是否大于1。不大于1肯定不执行


OutputFormat接口实现类

    OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat接口

    1.文本输出TextOutputFormat

        默认的输出格式是TextOutputFormat,它把每条记录写为文本行。它的键和值可以是任意类型,因为TextOutputFormat调用toString()方法把他们转换为字符串

    2.SequenceFileOutputFormat

        将SequenceFileOutputFormat输出作为后续MapReduce任务的输入,这便是一种好的输出格式,因为它的格式紧凑,很容易被压缩

    3.自定义OutputFormat


Reduce Join

    Map端的主要工作:为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出

    Reduce端的主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志)分开,最后进行合并就ok了。

    Reduce Join案例实操

Reduce Join缺点及解决方案

    缺点:这种方式中,合并的操作是在Reducer阶段完成,Reduce端的处理压力太大,Map节点的运算负载则很低,资源利用率不高,且在Reduce阶段极易产生数据倾斜。

    解决方案:Map端实现数据合并


Map Join

    使用场景:Map join适用于一张表十分小,一张表很大的场景

    问:在Reduce端处理过多的表,非常容易产生数据倾斜。怎么办?

    答:在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜

    Map Join案例实操


计数器应用

    Hadoop为每个作业维护若干个内置计数器,以描述多项指标。例如:某些计数器记录已处理的字节数和记录数,使用户可监控已处理的输入数据量和已产生的输出数据量。

    1.计数器API

        1)采用枚举方式统计计数

            enum MyCounter {MALFORORMED,NORMAL}

            //对枚举定义的自定义计数器加1

            context.getCounter(MyCounter.MALFORORMED).increment(1);

        2)采用计数器组,计数器名称的方式统计

            context.getCounter("counterGroup", "counter").increment(1);

        3)计数结果在程序运行后控制台上查看

计数器案例实操


数据清洗(ETL)

    在运行核心业务MapReducer程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理过程往往只需要运行Mapper程序,不需要执行Reducer程序。

数据清洗案例实操

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 195,898评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,401评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,058评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,539评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,382评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,319评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,706评论 3 386
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,370评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,664评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,715评论 2 312
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,476评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,326评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,730评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,003评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,275评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,683评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,877评论 2 335

推荐阅读更多精彩内容