九. MapReduce 案例
9.1 统计各个手机号的上传和下载流量总和
数据展示:
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 24 27 2481 24681 200
1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200
1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200
1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 24 27 2481 24681 200
1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200
1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200
1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200
数据解释:
# 每行数据的第二列数据是手机号,倒数第三列表示上行流量,倒数第二列表示下行流量
输出格式要求:
# 手机号 上行流量 下行流量 总流量
最终统计结果为:
13726230503 上传流量:4962 下载流量:49362 总数据流量: 54324
13826544101 上传流量:528 下载流量:0 总数据流量: 528
13926251106 上传流量:480 下载流量:0 总数据流量: 480
13926435656 上传流量:264 下载流量:3024 总数据流量: 3288
创建数据文件上传到HDFS文件系统中
[root@hadoop5 ~]# vim access.log
[root@hadoop5 ~]# cat access.log
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 24 27 2481 24681 200
1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200
1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200
1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 24 27 2481 24681 200
1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200
1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200
1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200
[root@hadoop5 ~]# hdfs dfs -mkdir -p /accesslog
[root@hadoop5 ~]# hdfs dfs -put access.log /accesslog
编写mapreduce的job作业完成统计
//统计手机流量
public class AccessLogJob extends Configured implements Tool {
private static Logger logger = Logger.getLogger(AccessLogJob.class);
public static void main(String[] args) throws Exception {
ToolRunner.run(new AccessLogJob(),args);
}
@Override
public int run(String[] strings) throws Exception {
//创建job作业
Job job = Job.getInstance(getConf(), "access-log");
job.setJarByClass(AccessLogJob.class);
//设置InputFormate
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("/accesslog/access.log"));
//设置map
job.setMapperClass(AccessLogMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//shuffle 无须设置 自动完成
//设置reduce
job.setReducerClass(AccessLogReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//设置Output Format
job.setOutputFormatClass(TextOutputFormat.class);
Path res = new Path("/accesslog/res");
FileSystem fileSystem = FileSystem.get(getConf());
if(fileSystem.exists(res)) {
fileSystem.delete(res,true);
}
TextOutputFormat.setOutputPath(job, res);
//提交job作业
boolean status = job.waitForCompletion(true);
System.out.println("本次作业执行状态 = " + status);
return 0;
}
public static class AccessLogMap extends Mapper<LongWritable, Text,Text,Text>{
@Override //参数1:行首字母偏移量 参数2:当前row数据 参数3:map输出上下文
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] values = value.toString().split("\t");
//输出key 为手机号 值为: 每个手机号"上传-下载流量"格式文本
context.write(new Text(values[1]),new Text(values[values.length-3]+"-"+values[values.length-2]));
logger.info("手机号: "+values[1]+" 流量格式:"+values[values.length-3]+"-"+values[values.length-2]);
}
}
//reduce
public static class AccessLogReduce extends Reducer<Text,Text,Text,Text>{
@Override //参数1:map的key 参数2:相当key的数组 参数3:Reduce输出的上下文
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
int uploadData = 0; //保存上传流量
int downData = 0; //保存下载流量
for (Text value : values) {
String[] datas = value.toString().split("-");
uploadData+= Integer.valueOf(datas[0]);
downData+= Integer.valueOf(datas[1]);
}
int total = uploadData + downData;//保存总流量
//输出
context.write(key,new Text(" 上传流量:"+uploadData+" 下载流量:"+downData+" 总数据流量: "+total));
logger.info("手机号: "+key+" 上传流量:"+uploadData+" 下载流量:"+downData+" 总数据流量: "+total);
}
}
}
运行mapreduce
查看结果
[root@hadoop5 ~]# hdfs dfs -text /accesslog/res/part-r-00000
13726230503 上传流量:4962 下载流量:49362 总数据流量: 54324
13826544101 上传流量:528 下载流量:0 总数据流量: 528
13926251106 上传流量:480 下载流量:0 总数据流量: 480
13926435656 上传流量:264 下载流量:3024 总数据流量: 3288
9.2 自定义MapReduce中数据类型
MapReduce的执行过程,无论是map阶段还是Reduce阶段都会跨JVM,通过网络通信传递数据,索引对于传递数据必须实现序列化,为此Hadoop的MapReduce模型对现有的数据类型进行了近一步的包装,如之前用到的IntWriteable
、LongWritable
、Text
、 DoubleWritable
、NullWritable
。如果处理简单计算有这些基础类型就够了,但是如果需要复杂结果是这些数据类型远远是不够的,因此我们需要根据实际情况自定义数据类型!
9.2.1 查看提供已知数据类型类图
通过类图得知hadoop提供的数据类型都间接实现了:
Wirtable
、Comparable
。直接实现WritableComparable
接口,因此我们自定义类型也需要实现相应的接口
9.2.2 查看WriteComparable接口
通过查看源码得知自定义的数据类型需要实现类中
wirte
、readFiles
、compareTo
、hashCode
和equals
、toString
等相关方法。
9.2.3 根据之前流量案例定义自定义类型
开发自定义Writable类型
//自定义Writable类型
public class AccessLogWritable implements WritableComparable<AccessLogWritable> {
private Integer upload;
private Integer down;
private Integer total;
@Override
public int compareTo(AccessLogWritable o) {
return this.total-o.getTotal();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(upload);
out.writeInt(down);
out.writeInt(total);
}
@Override
public void readFields(DataInput in) throws IOException {
this.upload = in.readInt();
this.down = in.readInt();
this.total = in.readInt();
}
@Override
public String toString() {
return "统计结果{" +
"上传流量=" + upload +
", 下载流量=" + down +
", 上传下载总流量=" + total +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AccessLogWritable accessLogWritable = (AccessLogWritable) o;
return Objects.equals(upload, accessLogWritable.upload) &&
Objects.equals(down, accessLogWritable.down) &&
Objects.equals(total, accessLogWritable.total);
}
@Override
public int hashCode() {
return Objects.hash(upload, down, total);
}
public Integer getUpload() {
return upload;
}
public void setUpload(Integer upload) {
this.upload = upload;
}
public Integer getDown() {
return down;
}
public void setDown(Integer down) {
this.down = down;
}
public Integer getTotal() {
return total;
}
public void setTotal(Integer total) {
this.total = total;
}
public AccessLogWritable() {
}
public AccessLogWritable(Integer upload, Integer down, Integer total) {
this.upload = upload;
this.down = down;
this.total = total;
}
public AccessLogWritable(Integer upload, Integer down) {
this.upload = upload;
this.down = down;
}
}
注意:write的顺序和read的顺序必须严格一致,读的类型和写的类型也必须完全一致
开发Job作业
public class AccessLogCustomerTypeJob extends Configured implements Tool {
public static void main(String[] args) throws Exception {
ToolRunner.run(new AccessLogCustomerTypeJob(),args);
}
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(getConf(), "customer-type-job");
job.setJarByClass(AccessLogCustomerTypeJob.class);
//设置input format
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("/accesslog/access.log"));
//设置map
job.setMapperClass(AccessLogCustomerTypeMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(AccessLogWritable.class);
//shuffle 无须设置 自动处理
//设置reduce
job.setReducerClass(AccessLogCustomerTypeReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(AccessLogWritable.class);
//设置Output Format
job.setOutputFormatClass(TextOutputFormat.class);
Path res = new Path("/accesslog/res2");
FileSystem fileSystem = FileSystem.get(getConf());
if(fileSystem.exists(res)){
fileSystem.delete(res,true);
}
TextOutputFormat.setOutputPath(job, res);
//提交作业
boolean status = job.waitForCompletion(true);
System.out.println("作业执行状态:" + status);
return 0;
}
public static class AccessLogCustomerTypeMap extends Mapper<LongWritable, Text,Text, AccessLogWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] values = value.toString().split("\t");
int upload = Integer.valueOf(values[values.length-3]);
int down = Integer.valueOf(values[values.length-2]);
context.write(new Text(values[1]),new AccessLogWritable(upload,down,0));
}
}
public static class AccessLogCustomerTypeReduce extends Reducer<Text, AccessLogWritable,Text, AccessLogWritable>{
@Override
protected void reduce(Text key, Iterable<AccessLogWritable> values, Context context) throws IOException, InterruptedException {
int upload =0;
int down = 0;
for (AccessLogWritable value : values) {
upload += value.getUpload();
down += value.getDown();
}
context.write(key,new AccessLogWritable(upload,down,upload+down));
}
}
}
执行job作业
[root@hadoop5 ~]# yarn jar hadoop_wordcount-1.0-SNAPSHOT.jar
19/12/20 10:33:57 INFO client.RMProxy: Connecting to ResourceManager at hadoop6/10.15.0.6:8032
19/12/20 10:34:00 INFO input.FileInputFormat: Total input files to process : 1
19/12/20 10:34:00 INFO mapreduce.JobSubmitter: number of splits:1
19/12/20 10:34:01 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
19/12/20 10:34:02 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1576780523481_0013
19/12/20 10:34:03 INFO impl.YarnClientImpl: Submitted application application_1576780523481_0013
19/12/20 10:34:03 INFO mapreduce.Job: The url to track the job: http://hadoop6:8088/proxy/application_1576780523481_0013/
19/12/20 10:34:03 INFO mapreduce.Job: Running job: job_1576780523481_0013
19/12/20 10:34:27 INFO mapreduce.Job: Job job_1576780523481_0013 running in uber mode : false
19/12/20 10:34:27 INFO mapreduce.Job: map 0% reduce 0%
19/12/20 10:34:43 INFO mapreduce.Job: map 100% reduce 0%
19/12/20 10:35:00 INFO mapreduce.Job: map 100% reduce 100%
19/12/20 10:35:02 INFO mapreduce.Job: Job job_1576780523481_0013 completed successfully
查看结果
[root@hadoop5 ~]# hdfs dfs -text /accesslog/res2/part-r-00000
13726230503 统计结果{上传流量=4962, 下载流量=49362, 上传下载总流量=54324}
13826544101 统计结果{上传流量=528, 下载流量=0, 上传下载总流量=528}
13926251106 统计结果{上传流量=480, 下载流量=0, 上传下载总流量=480}
13926435656 统计结果{上传流量=264, 下载流量=3024, 上传下载总流量=3288}
十. MapReduce的高级特性
10.1 MapRedcuce的数据清洗
10.1.1 数据清洗
所谓数据清洗
指的是在复杂的数据格式中获取我们需要的数据过程称之为数据清洗,整个过程仅仅是将复杂数据中我们需要的数据清洗出来,不涉及任何的统计计算工作,如下图展示过程就是数据清洗:
10.1.2 数据清洗编程思路分析
10.1.3 开发数据清洗
//开发数据清洗
public class DataCleanAccessLogJob extends Configured implements Tool {
public static void main(String[] args) throws Exception {
ToolRunner.run(new DataCleanAccessLogJob(),args);
}
@Override
public int run(String[] args) throws Exception {
//创建job作业
Job job = Job.getInstance(getConf(), "data-clean-access-log-job");
job.setJarByClass(DataCleanAccessLogJob.class);
//设置input format
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("/accesslog/access.log"));
//设置map
job.setMapperClass(DataCleanAccessLogMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
//跳过shuffle 和 reduce
job.setNumReduceTasks(0);
//设置output format
job.setOutputFormatClass(TextOutputFormat.class);
Path res = new Path("/accesslog/cleandata");
FileSystem fileSystem = FileSystem.get(getConf());
if(fileSystem.exists(res)){
fileSystem.delete(res,true);
}
TextOutputFormat.setOutputPath(job,res);
//提交job
boolean status = job.waitForCompletion(true);
System.out.println("作业提交状态 = " + status);
return 0;
}
//map阶段
public static class DataCleanAccessLogMap extends Mapper<LongWritable, Text,Text, NullWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] values = value.toString().split("\t");
Text keyout = new Text(values[1]+"\t"+values[6] + "\t" + values[7]);
context.write(keyout, NullWritable.get());
}
}
//没有reduce阶段
}
注意: 设置job.setNumReduceTasks(0);这句话本次的mapreduce就跳过了reduce阶段的执行
10.1.4 运行作业
[root@hadoop5 ~]# yarn jar hadoop_wordcount-1.0-SNAPSHOT.jar
19/12/20 13:22:03 INFO client.RMProxy: Connecting to ResourceManager at hadoop6/10.15.0.6:8032
19/12/20 13:22:07 INFO input.FileInputFormat: Total input files to process : 1
19/12/20 13:22:07 INFO mapreduce.JobSubmitter: number of splits:1
19/12/20 13:22:08 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
19/12/20 13:22:09 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1576780523481_0018
19/12/20 13:22:10 INFO impl.YarnClientImpl: Submitted application application_1576780523481_0018
19/12/20 13:22:10 INFO mapreduce.Job: The url to track the job: http://hadoop6:8088/proxy/application_1576780523481_0018/
19/12/20 13:22:10 INFO mapreduce.Job: Running job: job_1576780523481_0018
19/12/20 13:22:36 INFO mapreduce.Job: Job job_1576780523481_0018 running in uber mode : false
19/12/20 13:22:36 INFO mapreduce.Job: map 0% reduce 0%
19/12/20 13:22:51 INFO mapreduce.Job: map 100% reduce 0%
19/12/20 13:22:53 INFO mapreduce.Job: Job job_1576780523481_0018 completed successfully
10.1.5 查看运行结果
[root@hadoop5 ~]# hdfs dfs -text /accesslog/cleandata/part*
13726230503 2481 24681
13826544101 264 0
13926435656 132 1512
13926251106 240 0
13726230503 2481 24681
13826544101 264 0
13926435656 132 1512
13926251106 240 0
十一. MapReduce的高级特性
11.1 MapReduce中Map的数量
# MapReduce运行过程中Map的数量是由block所决定的:
也就是一个文件分为几个block就是几个map
注意:map的数量由block块决定,也就意味着一旦文件确定根据默认配置划分block也将确定,所以我们没有办法在程序中手动干预map的执行数量
11.2 MapReduce中Reduce的数量
# Reduce的数量是可以在程序中手动指定
默认数量为: 1个 Reduce
可以通过: job.setNumReduceTasks(0); 0 就是没有 数字是几就是几个
11.2.1 通过修改word count案例reduce数量比较不同
使用默认reduce数量也就是1的执行结果:
将reduce数量修改为多个这里修改为2个,运行查看结果
//省略............
//设置reduce 阶段
job.setReducerClass(WordCountReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置reduce数量
job.setNumReduceTasks(2);
//省略..........
运行job作业
查看结果
通过这里可以总结出:有几个reduce就会生成几个结果文件,多个reduce同时处理数据将原来一个reduce处理结果,分到了不同的reduce处理文件中,因此如果日后需要将所有结果数据汇总在一起之能设置一个reduce,如果想要将结果划分到多个文件中可以设置多个reduce数量
11.2.2 为什么要设置多个reduce数量?
# 1.提高MR的运行效率,从而快速统计计算工作
11.2.3 多个Reduce如何去分配map中数据?
一旦设置了多个reduce,如何让多个reduce均分map中统计的数据,这里面还有一个分区(Partition)概念,一个Reduce会形成一个分区,默认使用的是HashPartitioner会根据map输出key做hash运算去决定map中输出数据交给那个reduce处理
@InterfaceAudience.Public
@InterfaceStability.Stable
public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {
public void configure(JobConf job) {}
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K2 key, V2 value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
11.2.4 如何自定义分区(Partitoner)
自定义分区:可以根据业务规则将统计结果划分到不同分区中
数据格式
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 24 27 2481 24681 200
1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200
1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200
1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 24 27 2481 24681 200
1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200
1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200
1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200
要求
# 统计流量,并将不同省份数据的统计结果放在不同文件中
自定义分区
//自定义分区 输入数据map端结果
public class ProvincePartitioner extends Partitioner<Text,AccessLogWritable> {
//根据业务规则将不同省份结果划分到不同分区
private static HashMap<String,Integer> provincePartitioners = new HashMap<>();
static{
provincePartitioners.put("136",0);
provincePartitioners.put("137",1);
provincePartitioners.put("138",2);
provincePartitioners.put("139",3);
}
// 返回分区号给那个reduce
@Override
public int getPartition(Text key, AccessLogWritable accessLogWritable, int numPartitions) {
String keyPrefix = key.toString().substring(0, 3);
Integer partionId = provincePartitioners.get(keyPrefix);
return partionId ==null?4: partionId;
}
}
在job作业中指定分区
//设置分区
job.setPartitionerClass(ProvincePartitioner.class);
//设置reduce数量
job.setNumReduceTasks(5);
运行job作业
查看结果
11.3 计数器(Counter)
计数器:顾名思义就是用来对map运行数量和reduce运行数量进行统计的
11.3.1 在map中使用计数器
public static class AccessLogCustomerTypeMap extends Mapper<LongWritable, Text,Text, AccessLogWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//计数器
Counter access_map = context.getCounter("map-group", "access_map");
access_map.increment(1);
String[] values = value.toString().split("\t");
int upload = Integer.valueOf(values[values.length-3]);
int down = Integer.valueOf(values[values.length-2]);
context.write(new Text(values[1]),new AccessLogWritable(upload,down,0));
}
}
11.3.2 在reduce中使用计数器
public static class AccessLogCustomerTypeReduce extends Reducer<Text, AccessLogWritable,Text, AccessLogWritable>{
@Override
protected void reduce(Text key, Iterable<AccessLogWritable> values, Context context) throws IOException, InterruptedException {
//计数器
Counter access_map = context.getCounter("reduce-group", "access_reduce");
access_map.increment(1);
int upload =0;
int down = 0;
for (AccessLogWritable value : values) {
upload += value.getUpload();
down += value.getDown();
}
context.write(key,new AccessLogWritable(upload,down,upload+down));
}
}
11.4 Combiner 合并
Combiner合并:又称之为map端的reduce,主要是通过对map局部的数据先进行一次reduce,从而来减少map端输出数据频繁发送给Reduce处理时所带来的网络压力问题。通过这种提前对map输出做一次局部reduce,这样既可以减轻网络压力,又能提高效率。在mapreduce编程模型中默认是关闭的。
11.4.1 开启Combiner
.....................
//设置map
job.setMapperClass(AccessLogCustomerTypeMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(AccessLogWritable.class);
//shuffle 无须设置 自动处理
//设置Combiner
job.setCombinerClass(AccessLogCustomerTypeReduce.class);
//设置分区
job.setPartitionerClass(ProvincePartitioner.class);
...............
11.4.2 运行设置Combiner的job
没有设置Combiner时:
设置Combiner时:
十二. Job作业原理分析
12.1 Input Format 原理解析
12.1.1 类图和源码
查看Text InputFormat的类图
注意:通过类图发现最顶层父类为Input Format这个类
查看Input Formt 源码:
/**
* Logically split the set of input files for the job.
*
* <p>Each {@link InputSplit} is then assigned to an individual {@link Mapper}
* for processing.</p>
*
* <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
* input files are not physically split into chunks. For e.g. a split could
* be <i><input-file-path, start, offset></i> tuple. The InputFormat
* also creates the {@link RecordReader} to read the {@link InputSplit}.
*
* @param context job configuration.
* @return an array of {@link InputSplit}s for the job.
*/
public abstract
List<InputSplit> getSplits(JobContext context
) throws IOException, InterruptedException;
/**
* Create a record reader for a given split. The framework will call
* {@link RecordReader#initialize(InputSplit, TaskAttemptContext)} before
* the split is used.
* @param split the split to be read
* @param context the information about the task
* @return a new record reader
* @throws IOException
* @throws InterruptedException
*/
public abstract
RecordReader<K,V> createRecordReader(InputSplit split,
TaskAttemptContext context
) throws IOException,
InterruptedException;
-
getSplits方法:
是用来根据block计算逻辑上的切片,每一个逻辑切片对应一个map操作 -
createRecordReader方法:
用来将切片数据封装成一行(LongWritable,Text)
12.1.2 Input Format切片的计算方式
说明:
1. 当一个文件小于128M时一定会被分成一个逻辑切片,Block块与Split(切片)一一对应。
2.当一个文件大于128M,剩余大小大于切片的1.1倍,Block块与Split(切片)一一对应。反之如果一个文件大于128M,剩余大小小于切片的1.1倍,此时将划分为一个切片。
12.1.3 将数据封装成key和value
12.2 Map源码原理分析
这里只给出了核心方法:
/**
* Called once for each key/value pair in the input split. Most applications
* should override this, but the default is the identity function.
*/
@SuppressWarnings("unchecked")
protected void map(KEYIN key, VALUEIN value,
Context context) throws IOException, InterruptedException {
context.write((KEYOUT) key, (VALUEOUT) value);
}
/**
* Called once at the end of the task.
*/
protected void cleanup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
/**
* Expert users can override this method for more complete control over the
* execution of the Mapper.
* @param context
* @throws IOException
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}
12.3 Reduce源码分析
查看reduce的源码,这里只罗列核心代码:
**
* This method is called once for each key. Most applications will define
* their reduce class by overriding this method. The default implementation
* is an identity function.
*/
@SuppressWarnings("unchecked")
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
) throws IOException, InterruptedException {
for(VALUEIN value: values) {
context.write((KEYOUT) key, (VALUEOUT) value);
}
}
/**
* Called once at the end of the task.
*/
protected void cleanup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
/**
* Advanced application writers can use the
* {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
* control how the reduce task works.
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
// If a back up store is used, reset it
Iterator<VALUEIN> iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();
}
}
} finally {
cleanup(context);
}
}
}
12.4 OutputFormat 源码分析
查看源类图的结构:
注意:和 InputFormat基本是类似的,最顶层父类为Output Format:
TextOutputFormat部分源码如下:
/** An {@link OutputFormat} that writes plain text files. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {
public static String SEPERATOR = "mapreduce.output.textoutputformat.separator";
protected static class LineRecordWriter<K, V>
extends RecordWriter<K, V> {
private static final String utf8 = "UTF-8";
private static final byte[] newline;
static {
try {
newline = "\n".getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
}
protected DataOutputStream out;
private final byte[] keyValueSeparator;
public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
this.out = out;
try {
this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
}
public LineRecordWriter(DataOutputStream out) {
this(out, "\t");
}
}
通过源码得知默认的key和value输出分割符为"tab键"也就是这里的"\t"
12.5 Shuffle的分析
shuffle阶段是一个整体叫法: 其实又分为
Map端的shuffle
和Reduce端的Shuffle
12.5.1 Map端shuffle
12.5.2 Reduce端的shuffle
12.6 MapReduce整体运行原理
#1.计算切片
有几个切片就有几个map task
#2.环形缓存区
经过map函数的逻辑处理后的数据输出之后,会通过OutPutCollector收集器将数据收集到环形缓存区保存。
环形缓存区的大小默认为100M,当保存的数据达到80%时,就将缓存区的数据溢出到磁盘上保存。
#3.溢出
环形缓存区的数据达到其容量的80%时就会溢出到磁盘上进行保存,在此过程中,程序会对数据进行分区(默认HashPartition)和排序(默认根据key进行快排)
缓存区不断溢出的数据形成多个小文件
#4.合并
溢出的多个小文件各个区合并在一起(0区和0区合并成一个0区),形成大文件
通过归并排序保证区内的数据有序
#5.shuffle
从过程2到过程7之间,即map任务和reduce任务之间的数据流称为shuffle(混洗),而过程5最能体现出混洗这一概念。一般情况下,一个reduce任务的输入数据来自与多个map任务,多个reduce任务的情况下就会出现如过程5所示的,
每个reduce任务从map的输出数据中获取属于自己的那个分区的数据。
#6.合并
运行reducetask的节点通过过程5,将来自多个map任务的属于自己的分区数据下载到本地磁盘工作目录。这多个分区文件通过归并排序合并成大文件,并根据key值分好组(key值相同的,value值会以迭代器的形式组在一起)。
#7.reducetask
reducetask从本地工作目录获取已经分好组并且排好序的数据,将数据进行reduce函数中的逻辑处理。
#8.输出
每个reducetask输出一个结果文件。
十三. MapReduce与Yarn
13.1 Job作业提交过程
客户端的配置信息mapreduce.framework.name为yarn时,客户端会启动YarnRunner(yarn的客户端程序),并将mapreduce作业提交给yarn平台处理。
1.向ResourceManager请求运行一个mapreduce程序。
2.ResourceManager返回hdfs地址,告诉客户端将作业运行相关的资源文件上传到hdfs。
3.客户端提交mr程序运行所需的文件(包括作业的jar包,作业的配置文件,分片信息等)到hdfs上。
4.作业相关信息提交完成后,客户端用过调用ResourcrManager的submitApplication()方法提交作业。
5.ResourceManager将作业传递给调度器,调度器的默认调度策略是先进先出。
6.调度器寻找一台空闲的节点,并在该节点隔离出一个容器(container),容器中分配了cpu,内存等资源,并启动MRAppmaster进程。
7.MRAppmaster根据需要运行多少个map任务,多少个reduce任务向ResourceManager请求资源。
8.ResourceManager分配相应数量的容器,并告知MRAppmaster容器在哪。
9.MRAppmaster启动maptask。
10.maptask从HDFS获取分片数据执行map逻辑。
11.map逻辑执行结束后,MRAppmaster启动reducetask。
12.reducetask从maptask获取属于自己的分区数据执行reduce逻辑。
13.reduce逻辑结束后将结果数据保存到HDFS上。
14.mapreduce作业结束后,MRAppmaster通知ResourceManager结束自己,让ResourceManager回收所有资源。
十四. HA的hadoop集群搭建
14.1 集群规划
# 集群规划
10.15.0.20 zk zknodes 通过一个节点充当整个集群
10.15.0.22 hadoop22 NameNode (active) & ZKFC
10.15.0.23 hadoop23 NameNode (standby) & ZKFC
10.15.0.24 hadoop24 ResourceManager(active)
10.15.0.25 hadoop25 ResourceManager(standby)
10.15.0.26 hadoop26 DataNode & JournalNode & NodeManager
10.15.0.27 hadoop27 DataNode & JournalNode & NodeManager
10.15.0.28 hadoop28 DataNode & JournalNode & NodeManager
# 克隆机器做准备工作:
0.修改ip地址为上述ip
1.修改主机名/etc/hostsname为上述对应主机名 修改完必须重新启动
2.配置主机名ip地址映射/etc/hosts文件并同步所有节点
10.15.0.20 zk
10.15.0.22 hadoop22
10.15.0.23 hadoop23
10.15.0.24 hadoop24
10.15.0.25 hadoop25
10.15.0.26 hadoop26
10.15.0.27 hadoop27
10.15.0.28 hadoop28
3.所有节点安装jdk并配置环境变量
4.关闭所有机器的网络防火墙配置
systemctl stop firewalld
systemctl disable firewalld
5.所有节点安装centos7.x搭建集群的依赖
yum install psmisc -y
6.配置ssh免密登录
hadoop22 生成ssh-keygen 然后ssh-copy-id 到 hadoop22~~hadoop28 上每一个节点
hadoop23 生成ssh-keygen 然后ssh-copy-id 到 hadoop22~~hadoop28 上每一个节点
hadoop24 生成ssh-keygen 然后ssh-copy-id 到 hadoop22~~hadoop28 上每一个节点
hadoop25 生成ssh-keygen 然后ssh-copy-id 到 hadoop22~~hadoop28 上每一个节点
14.2 搭建zk集群
# 1.安装zk安装包
[root@zk ~]# tar -zxvf zookeeper-3.4.12.tar.gz
# 2.准备zk的数据文件夹
[root@zk ~]# mkdir zkdata1 zkdata2 zkdata3
# 3.在每个数据文件夹中准备集群唯一标识文件myid
[root@zk ~]# echo "1" >> zkdata1/myid
[root@zk ~]# echo "2" >> zkdata2/myid
[root@zk ~]# echo "3" >> zkdata3/myid
# 4.在每个数据文件夹中准备zk的配置文件zoo.cfg
[root@zk ~]# vim /root/zkdata1/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/root/zkdata1
clientPort=3001
server.1=zk:3002:3003
server.2=zk:4002:4003
server.3=zk:5002:5003
[root@zk ~]# vim /root/zkdata2/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/root/zkdata2
clientPort=4001
server.1=zk:3002:3003
server.2=zk:4002:4003
server.3=zk:5002:5003
[root@zk ~]# vim /root/zkdata3/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/root/zkdata3
clientPort=5001
server.1=zk:3002:3003
server.2=zk:4002:4003
server.3=zk:5002:5003
# 5.进入zk安装目录bin目录执行如下命令启动zk集群:
[root@zk bin]# ./zkServer.sh start /root/zkdata1/zoo.cfg
[root@zk bin]# ./zkServer.sh start /root/zkdata2/zoo.cfg
[root@zk bin]# ./zkServer.sh start /root/zkdata3/zoo.cfg
# 6.进入zk安装目录bin目录执行如下命令查看集群状态
./zkServer.sh status /root/zkdata1/zoo.cfg
./zkServer.sh status /root/zkdata2/zoo.cfg
./zkServer.sh status /root/zkdata3/zoo.cfg
14.3 搭建hadoop的HA集群
# 1.在hadoop22--hadoop28上安装hadoop安装包
tar -zxf hadoop-2.9.2.tar.gz
# 2.在hadoop22--hadoop28机器上配置hadoop环境变量
# 3.在hadoop22节点上配置hadoop-env.sh文件
export JAVA_HOME=/usr/java/jdk1.8.0_171-amd64
# 4.在hadoop22节点上配置core-site.xml文件
[root@hadoop22 ~]# vim hadoop-2.9.2/etc/hadoop/core-site.xml
<!--hdfs主要入口不再是一个具体机器而是一个虚拟的名称 -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://ns</value>
</property>
<!-- hadoop临时目录位置 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/root/hadoop-2.9.2/data</value>
</property>
<!--zk集群的所有节点-->
<property>
<name>ha.zookeeper.quorum</name>
<value>zk:3001,zk:4001,zk:5001</value>
</property>
# 5.在hadoop2节点上配置hdfs-site.xml文件
[root@hadoop22 ~]# vim hadoop-2.9.2/etc/hadoop/hdfs-site.xml
<!--指定hdfs的nameservice为ns,需要和core-site.xml中的保持一致 -->
<property>
<name>dfs.nameservices</name>
<value>ns</value>
</property>
<!-- ns下面有两个NameNode,分别是nn1,nn2 -->
<property>
<name>dfs.ha.namenodes.ns</name>
<value>nn1,nn2</value>
</property>
<!-- nn1的RPC通信地址 -->
<property>
<name>dfs.namenode.rpc-address.ns.nn1</name>
<value>hadoop22:9000</value>
</property>
<!-- nn1的http通信地址 -->
<property>
<name>dfs.namenode.http-address.ns.nn1</name>
<value>hadoop22:50070</value>
</property>
<!-- nn2的RPC通信地址 -->
<property>
<name>dfs.namenode.rpc-address.ns.nn2</name>
<value>hadoop23:9000</value>
</property>
<!-- nn2的http通信地址 -->
<property>
<name>dfs.namenode.http-address.ns.nn2</name>
<value>hadoop23:50070</value>
</property>
<!-- 指定NameNode的元数据在JournalNode上的存放位置 -->
<property>
<name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://hadoop26:8485;hadoop27:8485;hadoop28:8485/ns</value>
</property>
<!-- 指定JournalNode在本地磁盘存放数据的位置 -->
<property>
<name>dfs.journalnode.edits.dir</name>
<value>/root/journal</value>
</property>
<!-- 开启NameNode故障时自动切换 -->
<property>
<name>dfs.ha.automatic-failover.enabled</name>
<value>true</value>
</property>
<!-- 配置失败自动切换实现方式 -->
<property>
<name>dfs.client.failover.proxy.provider.ns</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
<!-- 配置隔离机制,如果ssh是默认22端口,value直接写sshfence即可 -->
<property>
<name>dfs.ha.fencing.methods</name>
<value>sshfence</value>
</property>
<!-- 使用隔离机制时需要ssh免登陆 -->
<property>
<name>dfs.ha.fencing.ssh.private-key-files</name>
<value>/root/.ssh/id_rsa</value>
</property>
# 6.在hadoop2节点上配置yarn-site.xml文件
<!-- 开启RM高可用 -->
<property>
<name>yarn.resourcemanager.ha.enabled</name>
<value>true</value>
</property>
<!-- 指定RM的cluster id -->
<property>
<name>yarn.resourcemanager.cluster-id</name>
<value>yrc</value>
</property>
<!-- 指定RM的名字 -->
<property>
<name>yarn.resourcemanager.ha.rm-ids</name>
<value>rm1,rm2</value>
</property>
<!-- 分别指定RM的地址 -->
<property>
<name>yarn.resourcemanager.hostname.rm1</name>
<value>hadoop24</value>
</property>
<property>
<name>yarn.resourcemanager.hostname.rm2</name>
<value>hadoop25</value>
</property>
<property>
<name>yarn.resourcemanager.webapp.address.rm1</name>
<value>hadoop24:8088</value>
</property>
<property>
<name>yarn.resourcemanager.webapp.address.rm2</name>
<value>hadoop25:8088</value>
</property>
<!-- 指定zk集群地址 -->
<property>
<name>yarn.resourcemanager.zk-address</name>
<value>zk:3001,zk:4001,zk:5001</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
# 7.在hadoop2节点上配置mapred-site.xml文件,默认不存在需要复制
[root@hadoop22 ~]# cp hadoop-2.9.2/etc/hadoop/mapred-site.xml.template hadoop-2.9.2/etc/hadoop/mapred-site.xml
[root@hadoop22 ~]# vim hadoop-2.9.2/etc/hadoop/mapred-site.xml
<!-- 指定mr框架为yarn方式 -->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
# 8.在hadoop2节点上配置slaves文件
[root@hadoop22 ~]# vim hadoop-2.9.2/etc/hadoop/slaves
hadoop26
hadoop27
hadoop28
# 9.同步集群配置文件
scp -r hadoop-2.9.2/etc/hadoop/ hadoop23:/root/hadoop-2.9.2/etc/
scp -r hadoop-2.9.2/etc/hadoop/ hadoop24:/root/hadoop-2.9.2/etc/
scp -r hadoop-2.9.2/etc/hadoop/ hadoop25:/root/hadoop-2.9.2/etc/
scp -r hadoop-2.9.2/etc/hadoop/ hadoop26:/root/hadoop-2.9.2/etc/
scp -r hadoop-2.9.2/etc/hadoop/ hadoop27:/root/hadoop-2.9.2/etc/
scp -r hadoop-2.9.2/etc/hadoop/ hadoop28:/root/hadoop-2.9.2/etc/
# 10.启动HDFS的高可用
1.在任意NameNode上格式化ZK
hdfs zkfc -formatZK
2.在hadoop26 hadoop27 hadoop28启动journal node
[root@hadoop26 ~]# hadoop-daemon.sh start journalnode
[root@hadoop27 ~]# hadoop-daemon.sh start journalnode
[root@hadoop28 ~]# hadoop-daemon.sh start journalnode
3.在活跃的NameNode节点上执行格式化
[root@hadoop22 ~]# hdfs namenode -format ns
4.在NameNode上启动hdfs集群
[root@hadoop22 ~]# start-dfs.sh
5.在standby的NameNode上执行
[root@hadoop23 ~]# hdfs namenode -bootstrapStandby
6.在standby的NameNode执行
[root@hadoop23 ~]# hadoop-daemon.sh start namenode
# 11.在活跃节点上启动yarn集群
1.在活跃的resourcemang节点上执行
[root@hadoop24 ~]# start-yarn.sh
2.在standby的节点上执行
[root@hadoop25 ~]# yarn-daemon.sh start resourcemanager
# 12.测试集群