Hadoop学习之YARN及MapReduce

YARN

YARN是Hadoop的一个资源调度框架,下面简单介绍下它的运行流程

yarn运行流程.jpg

以上是YARN的一个流程图,简单介绍

  1. Client提交任务执行请求
  2. ResourceManager接收到执行请求后,向Client返回一个作业ID,ResourceManager选择一个NodeManager分配一个Container,此NodeManager启动一个ApplicationMaster(AM)
  3. AM计算需要的资源及数据,向ResourceManager注册并申请计算所需要的资源
  4. ResourceManager分配合适的NodeManager给ApplicationMaster,ApplicationMaster与NodeManager协调分配container,启动相应的Task
  5. Application监控任务的执行情况并与Client通信,报告程序运行状态(作业运行失败,重新执行/在其他节点执行)
  6. 程序结束,相关Container及ApplicationMaster释放

MapReduce

MapReduce简介

  • MapReduce是一个分布式的批处理计算框架,源于Google在2014年12月发布的一篇关于分布式计算的论文,Google并未开源,MapReduce是根据论文思路开发的一套框架,可以说是Google MapReduce的克隆版
  • 一个MapReduce程序分为Map阶段和Reduce阶段
  • MapReduce的典型特性包括:易于编程(用户只需要在Map和Reduce的实现函数中编写业务逻辑,就可以实现分布式计算)、扩展性好(可根据资源灵活的执行及配置任务)、容错性高(集群中一个节点故障,该节点的任务会调度到其他节点执行)、适合海量数据的离线处理

MapReduce应用场景

  • 数据统计,如网站PV、UV统计
  • 搜索引擎的索引,比如百度就是用MapReduce来建立索引,MapReduce产生的需求背景就是Google创建搜索索引
  • 海量数据查找
  • 复杂的数据分析算法的实现比如聚类算法、分类算法、推荐算法等

MapReduce的短板

  • 实时计算,实时计算讲究时效性,MapReduce适合大规模数据的计算,在时效性方面就
  • 流式计算,流式计算的输入数据是动态的,MapReduce的数据集是静态的,因而无法实现流式计算
  • DAG计算,DAG计算讲究多次迭代,多个作业之间存在很严重的依赖关系

MapReduce编程模型

分为Map和Reduce阶段两部分组成

Map阶段

Map阶段由若干Map Task组成

  • 从HDFS获取输入数据并解析 InputFormat
  • 数据处理 Mapper
  • 数据分组:Partitioner

Reduce阶段

由若干Reduce Task组成,输入为Map Task的输出

  • 由Map的输出数据远程拷贝
  • 数据按照 key排序
  • 数据处理 Reducer
  • 数据输出:OutputFormat

内部逻辑

以下为MapReduce简要的处理过程图


mapreduce内部逻辑.jpg
  1. 对输入数据进行分片(Split),一般以默认的block进行split,也可以自定义
  • 文件分片的过程中,很大可能会出现跨行问题,MapReduce的解决方案是读取完整,跨block分到上一个split,下一split分这些
  • 在Split的时候,会将数据解析成key-value对,该默认实现为TextInputFormat,Key为行在文本中的偏移量,Value为行内容,如果一行被截断,则读取下一个的block的前几个字符,Split大小也可以自定义
  1. Mapper程序读取切分好的数据(通过InputFormat接口)
  2. Mapper程序对数据进行处理,按照key、value的形式进行整合
  3. Partitioner对Mapper输出的数据进行分区,决定由哪个Reduce进行处理
  • Partitioner的默认实现为HashPartitioner,具体做法大致是对key取Hash值,相对Reduce的数量取余,产生的数字即为Reduce Task号
  • Partitioner也允许自定义分区
  1. 对数据进行Shuffle和Sort,分配到对应的Reducer进行处理
  2. 结果输出

编程接口

Java编程接口

  • 旧API:org.apache.hadoop.mapred,目前不建议使用
  • 新:org.apache.hadoop.mapreduce
  • 新API具有更好的扩展性,两种API内部执行引擎一样

WordCount

WordCount相当于普通编程语言的HelloWorld

需求:统计规律切分的大规模文本中单词出现次数

文本中单词例子:

hello   world   this    my
hh  dd  mm  dd  kk
思路:
  1. Map阶段每行数据按分隔符切分,单词作为key,1为value
  2. Reduce阶段:远程从Map阶段的输出结果copy数据,并进行整理排序,把相同的key合并,形成key对应一组value的数据集,即为reduce的输入,reduce针对每个key调用reduce方法,对value求和
编程:
  1. 编程采用Java进行,使用maven管理jar包,具体环境设置在以后有机会补充,目前请网上查看,资料也不少。需要注意的是由于众所周知的原因,请设置国内maven仓库镜像
    pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.nanri</groupId>
    <artifactId>bigdatalearn</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-common</artifactId>
            <version>2.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
            <version>2.7.0</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <executions>
                    <execution>
                        <id>default-compile</id>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                        <configuration>
                            <encoding>UTF-8</encoding>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

代码:具体见代码注释

package com.nanri.mapr01;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordCountApp {
//map阶段,输入key、value输出key、value类型
    static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //读取数据,将数据转换成字符串
            String line = value.toString();
            //拆分数据
            String[] words = line.split("\t");
            for(String word : words) {
                //输出需要序列化写出
                context.write(new Text(word), new IntWritable(1));
            }
        }
    }

    static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        // 输入:k:v1, v2, v3
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        //定义一个计数器
            int count = 0;
            //遍历,将key出现的次数累加
            for(IntWritable value : values) {
                count += value.get();
            }
            context.write(key, new IntWritable(count));
        }
    }

//执行程序
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    //定义的job名称
        String jobName = args[0];
        //输入路径
        String inputPath = args[1];
        //输出路径,执行时此路径在hadoop上必须不存在
        String outputPath = args[2];
        //执行配置
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 设置作业名称
        job.setJobName(jobName);
        //设置具体执行类
        job.setJarByClass(WordCountApp.class);
        //mapper和reducer
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        //设置mapper阶段的输出的key和value类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //设置reducer阶段的key和value类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job, new Path(outputPath));

        // 作业完成退出
        System.exit(job.waitForCompletion(true)?0:1);
    }
}


代码编写完成后,进行打包,将编译好的工程jar包文件上传到服务器,运行hadoop jar bigdatalearn-1.0-SNAPSHOT.jar com.nanri.mapr01.WordCountApp wordcountapp /wordcount/input /wordcount/output,jar包后第一个参数为入口类所在的完整路径,之后参数为具体main方法中定义的参数,需确保输出路径不存在,稍等一会,即可看到给的跟踪地址及执行成功后的提示

  • Mapreduce用户编写的程序分为三个部分:Mapper、Reducer、Driver
  • Mapper的输入数据是K-V对的形式,key一般为行号,value为每行数据
  • Mapper的输入数据是K-V对的形式,
  • Mapper的业务逻辑写在map()方法中
  • map()方法对每个<K,V>执行一次
  • Reducer的输入数据类型对应Mapper的输出数据类型,也是K-V对
  • Reducetask进程对每一组相同k的<k,v>组调用一次reduce()方法
  • 用户自定义的Mapper和Reducer都要继承各自的父类
  • 整个程序需要一个Driver来提交,提交的是一个描述了各种必要信息的job对象
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,386评论 6 479
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,939评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,851评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,953评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,971评论 5 369
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,784评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,126评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,765评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,148评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,744评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,858评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,479评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,080评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,053评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,278评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,245评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,590评论 2 343

推荐阅读更多精彩内容

  • 摘自:http://staticor.io/post/hadoop/2016-01-23hadoop-defini...
    wangliang938阅读 583评论 0 1
  • 思考问题 MapReduce总结 MapReduce MapReduce的定义MapReduce是一种编程模型, ...
    Sakura_P阅读 934评论 0 1
  • MapReduce框架结构## MapReduce是一个用于大规模数据处理的分布式计算模型MapReduce模型主...
    Bloo_m阅读 3,724评论 0 4
  • “有些人一直没机会见,等有机会见了,却又犹豫了,相见不如不见。 有些事一别竟是一辈子,一直没机会做,等有机会了,...
    楠楠小姐姐阅读 1,959评论 0 2
  • 凌晨出门,等车上班 惊讶于,昨晚的一夜西风,竟不知何时,又吹落了,一地的,国槐 细细的,密密的,又有点稀稀疏疏的,...
    夜语婉婷阅读 181评论 0 0