YARN
YARN是Hadoop的一个资源调度框架,下面简单介绍下它的运行流程
以上是YARN的一个流程图,简单介绍
- Client提交任务执行请求
- ResourceManager接收到执行请求后,向Client返回一个作业ID,ResourceManager选择一个NodeManager分配一个Container,此NodeManager启动一个ApplicationMaster(AM)
- AM计算需要的资源及数据,向ResourceManager注册并申请计算所需要的资源
- ResourceManager分配合适的NodeManager给ApplicationMaster,ApplicationMaster与NodeManager协调分配container,启动相应的Task
- Application监控任务的执行情况并与Client通信,报告程序运行状态(作业运行失败,重新执行/在其他节点执行)
- 程序结束,相关Container及ApplicationMaster释放
MapReduce
MapReduce简介
- MapReduce是一个分布式的批处理计算框架,源于Google在2014年12月发布的一篇关于分布式计算的论文,Google并未开源,MapReduce是根据论文思路开发的一套框架,可以说是Google MapReduce的克隆版
- 一个MapReduce程序分为Map阶段和Reduce阶段
- MapReduce的典型特性包括:易于编程(用户只需要在Map和Reduce的实现函数中编写业务逻辑,就可以实现分布式计算)、扩展性好(可根据资源灵活的执行及配置任务)、容错性高(集群中一个节点故障,该节点的任务会调度到其他节点执行)、适合海量数据的离线处理
MapReduce应用场景
- 数据统计,如网站PV、UV统计
- 搜索引擎的索引,比如百度就是用MapReduce来建立索引,MapReduce产生的需求背景就是Google创建搜索索引
- 海量数据查找
- 复杂的数据分析算法的实现比如聚类算法、分类算法、推荐算法等
MapReduce的短板
- 实时计算,实时计算讲究时效性,MapReduce适合大规模数据的计算,在时效性方面就
- 流式计算,流式计算的输入数据是动态的,MapReduce的数据集是静态的,因而无法实现流式计算
- DAG计算,DAG计算讲究多次迭代,多个作业之间存在很严重的依赖关系
MapReduce编程模型
分为Map和Reduce阶段两部分组成
Map阶段
Map阶段由若干Map Task组成
- 从HDFS获取输入数据并解析 InputFormat
- 数据处理 Mapper
- 数据分组:Partitioner
Reduce阶段
由若干Reduce Task组成,输入为Map Task的输出
- 由Map的输出数据远程拷贝
- 数据按照 key排序
- 数据处理 Reducer
- 数据输出:OutputFormat
内部逻辑
以下为MapReduce简要的处理过程图
- 对输入数据进行分片(Split),一般以默认的block进行split,也可以自定义
- 文件分片的过程中,很大可能会出现跨行问题,MapReduce的解决方案是读取完整,跨block分到上一个split,下一split分这些
- 在Split的时候,会将数据解析成key-value对,该默认实现为TextInputFormat,Key为行在文本中的偏移量,Value为行内容,如果一行被截断,则读取下一个的block的前几个字符,Split大小也可以自定义
- Mapper程序读取切分好的数据(通过InputFormat接口)
- Mapper程序对数据进行处理,按照key、value的形式进行整合
- Partitioner对Mapper输出的数据进行分区,决定由哪个Reduce进行处理
- Partitioner的默认实现为HashPartitioner,具体做法大致是对key取Hash值,相对Reduce的数量取余,产生的数字即为Reduce Task号
- Partitioner也允许自定义分区
- 对数据进行Shuffle和Sort,分配到对应的Reducer进行处理
- 结果输出
编程接口
Java编程接口
- 旧API:org.apache.hadoop.mapred,目前不建议使用
- 新:org.apache.hadoop.mapreduce
- 新API具有更好的扩展性,两种API内部执行引擎一样
WordCount
WordCount相当于普通编程语言的HelloWorld
需求:统计规律切分的大规模文本中单词出现次数
文本中单词例子:
hello world this my
hh dd mm dd kk
思路:
- Map阶段每行数据按分隔符切分,单词作为key,1为value
- Reduce阶段:远程从Map阶段的输出结果copy数据,并进行整理排序,把相同的key合并,形成key对应一组value的数据集,即为reduce的输入,reduce针对每个key调用reduce方法,对value求和
编程:
- 编程采用Java进行,使用maven管理jar包,具体环境设置在以后有机会补充,目前请网上查看,资料也不少。需要注意的是由于众所周知的原因,请设置国内maven仓库镜像
pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.nanri</groupId>
<artifactId>bigdatalearn</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>2.7.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<executions>
<execution>
<id>default-compile</id>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<encoding>UTF-8</encoding>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
代码:具体见代码注释
package com.nanri.mapr01;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountApp {
//map阶段,输入key、value输出key、value类型
static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//读取数据,将数据转换成字符串
String line = value.toString();
//拆分数据
String[] words = line.split("\t");
for(String word : words) {
//输出需要序列化写出
context.write(new Text(word), new IntWritable(1));
}
}
}
static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
// 输入:k:v1, v2, v3
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//定义一个计数器
int count = 0;
//遍历,将key出现的次数累加
for(IntWritable value : values) {
count += value.get();
}
context.write(key, new IntWritable(count));
}
}
//执行程序
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//定义的job名称
String jobName = args[0];
//输入路径
String inputPath = args[1];
//输出路径,执行时此路径在hadoop上必须不存在
String outputPath = args[2];
//执行配置
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 设置作业名称
job.setJobName(jobName);
//设置具体执行类
job.setJarByClass(WordCountApp.class);
//mapper和reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//设置mapper阶段的输出的key和value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//设置reducer阶段的key和value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
// 作业完成退出
System.exit(job.waitForCompletion(true)?0:1);
}
}
代码编写完成后,进行打包,将编译好的工程jar包文件上传到服务器,运行hadoop jar bigdatalearn-1.0-SNAPSHOT.jar com.nanri.mapr01.WordCountApp wordcountapp /wordcount/input /wordcount/output
,jar包后第一个参数为入口类所在的完整路径,之后参数为具体main方法中定义的参数,需确保输出路径不存在,稍等一会,即可看到给的跟踪地址及执行成功后的提示
- Mapreduce用户编写的程序分为三个部分:Mapper、Reducer、Driver
- Mapper的输入数据是K-V对的形式,key一般为行号,value为每行数据
- Mapper的输入数据是K-V对的形式,
- Mapper的业务逻辑写在map()方法中
- map()方法对每个<K,V>执行一次
- Reducer的输入数据类型对应Mapper的输出数据类型,也是K-V对
- Reducetask进程对每一组相同k的<k,v>组调用一次reduce()方法
- 用户自定义的Mapper和Reducer都要继承各自的父类
- 整个程序需要一个Driver来提交,提交的是一个描述了各种必要信息的job对象