hadoop学习二

九. MapReduce 案例

9.1 统计各个手机号的上传和下载流量总和

数据展示:

1363157985066   13726230503 00-FD-07-A4-72-B8:CMCC  120.196.100.82  24  27  2481    24681   200
1363157995052   13826544101 5C-0E-8B-C7-F1-E0:CMCC  120.197.40.4    4   0   264 0   200
1363157991076   13926435656 20-10-7A-28-CC-0A:CMCC  120.196.100.99  2   4   132 1512    200
1363154400022   13926251106 5C-0E-8B-8B-B1-50:CMCC  120.197.40.4    4   0   240 0   200
1363157985066   13726230503 00-FD-07-A4-72-B8:CMCC  120.196.100.82  24  27  2481    24681   200
1363157995052   13826544101 5C-0E-8B-C7-F1-E0:CMCC  120.197.40.4    4   0   264 0   200
1363157991076   13926435656 20-10-7A-28-CC-0A:CMCC  120.196.100.99  2   4   132 1512    200
1363154400022   13926251106 5C-0E-8B-8B-B1-50:CMCC  120.197.40.4    4   0   240 0   200

数据解释:

# 每行数据的第二列数据是手机号,倒数第三列表示上行流量,倒数第二列表示下行流量

输出格式要求:

# 手机号 上行流量    下行流量    总流量

最终统计结果为:

13726230503  上传流量:4962  下载流量:49362  总数据流量:  54324
13826544101  上传流量:528  下载流量:0  总数据流量:  528
13926251106  上传流量:480  下载流量:0  总数据流量:  480
13926435656  上传流量:264  下载流量:3024  总数据流量:  3288

创建数据文件上传到HDFS文件系统中

[root@hadoop5 ~]# vim access.log
[root@hadoop5 ~]# cat access.log
                1363157985066   13726230503 00-FD-07-A4-72-B8:CMCC  120.196.100.82  24  27  2481    24681   200
                1363157995052   13826544101 5C-0E-8B-C7-F1-E0:CMCC  120.197.40.4    4   0   264 0   200
                1363157991076   13926435656 20-10-7A-28-CC-0A:CMCC  120.196.100.99  2   4   132 1512    200
                1363154400022   13926251106 5C-0E-8B-8B-B1-50:CMCC  120.197.40.4    4   0   240 0   200
                1363157985066   13726230503 00-FD-07-A4-72-B8:CMCC  120.196.100.82  24  27  2481    24681   200
                1363157995052   13826544101 5C-0E-8B-C7-F1-E0:CMCC  120.197.40.4    4   0   264 0   200
                1363157991076   13926435656 20-10-7A-28-CC-0A:CMCC  120.196.100.99  2   4   132 1512    200
                1363154400022   13926251106 5C-0E-8B-8B-B1-50:CMCC  120.197.40.4    4   0   240 0   200
[root@hadoop5 ~]# hdfs dfs -mkdir -p /accesslog
[root@hadoop5 ~]# hdfs dfs -put access.log /accesslog
image-20191221171759635
image-20191221171920552

编写mapreduce的job作业完成统计

//统计手机流量
public class AccessLogJob extends Configured implements Tool {

    private static Logger logger = Logger.getLogger(AccessLogJob.class);

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new AccessLogJob(),args);
    }

    @Override
    public int run(String[] strings) throws Exception {
        //创建job作业
        Job job = Job.getInstance(getConf(), "access-log");
        job.setJarByClass(AccessLogJob.class);

        //设置InputFormate
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("/accesslog/access.log"));

        //设置map
        job.setMapperClass(AccessLogMap.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        //shuffle  无须设置 自动完成

        //设置reduce
        job.setReducerClass(AccessLogReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        //设置Output Format
        job.setOutputFormatClass(TextOutputFormat.class);
        Path res = new Path("/accesslog/res");
        FileSystem fileSystem = FileSystem.get(getConf());
        if(fileSystem.exists(res)) {
            fileSystem.delete(res,true);
        }
        TextOutputFormat.setOutputPath(job, res);

        //提交job作业
        boolean status = job.waitForCompletion(true);
        System.out.println("本次作业执行状态 = " + status);

        return 0;
    }



    public static class AccessLogMap extends Mapper<LongWritable, Text,Text,Text>{

        @Override //参数1:行首字母偏移量  参数2:当前row数据 参数3:map输出上下文
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] values = value.toString().split("\t");
            //输出key 为手机号  值为: 每个手机号"上传-下载流量"格式文本
            context.write(new Text(values[1]),new Text(values[values.length-3]+"-"+values[values.length-2]));

            logger.info("手机号: "+values[1]+"  流量格式:"+values[values.length-3]+"-"+values[values.length-2]);
        }
    }
    //reduce
    public static class AccessLogReduce extends Reducer<Text,Text,Text,Text>{
        @Override //参数1:map的key  参数2:相当key的数组   参数3:Reduce输出的上下文
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            int uploadData = 0; //保存上传流量
            int downData = 0;   //保存下载流量
            for (Text value : values) {
                String[] datas = value.toString().split("-");
                uploadData+= Integer.valueOf(datas[0]);
                downData+= Integer.valueOf(datas[1]);
            }
            int total = uploadData + downData;//保存总流量

            //输出
            context.write(key,new Text(" 上传流量:"+uploadData+"  下载流量:"+downData+"  总数据流量:  "+total));
            logger.info("手机号: "+key+" 上传流量:"+uploadData+"  下载流量:"+downData+"  总数据流量:  "+total);
        }
    }

}

运行mapreduce

image-20191221183828609

查看结果

[root@hadoop5 ~]# hdfs dfs -text /accesslog/res/part-r-00000
13726230503  上传流量:4962  下载流量:49362  总数据流量:  54324
13826544101  上传流量:528  下载流量:0  总数据流量:  528
13926251106  上传流量:480  下载流量:0  总数据流量:  480
13926435656  上传流量:264  下载流量:3024  总数据流量:  3288
image-20191221184603660

9.2 自定义MapReduce中数据类型

MapReduce的执行过程,无论是map阶段还是Reduce阶段都会跨JVM,通过网络通信传递数据,索引对于传递数据必须实现序列化,为此Hadoop的MapReduce模型对现有的数据类型进行了近一步的包装,如之前用到的IntWriteableLongWritableTextDoubleWritableNullWritable。如果处理简单计算有这些基础类型就够了,但是如果需要复杂结果是这些数据类型远远是不够的,因此我们需要根据实际情况自定义数据类型!

9.2.1 查看提供已知数据类型类图

image-20191225122724269

通过类图得知hadoop提供的数据类型都间接实现了:WirtableComparable 。直接实现WritableComparable接口,因此我们自定义类型也需要实现相应的接口

9.2.2 查看WriteComparable接口

image-20191221202600106

通过查看源码得知自定义的数据类型需要实现类中 wirtereadFilescompareTohashCodeequalstoString等相关方法。

9.2.3 根据之前流量案例定义自定义类型

开发自定义Writable类型

//自定义Writable类型
public class AccessLogWritable implements WritableComparable<AccessLogWritable> {
    private Integer upload;
    private Integer down;
    private Integer total;

    @Override
    public int compareTo(AccessLogWritable o) {
        return this.total-o.getTotal();
    }
        
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(upload);
        out.writeInt(down);
        out.writeInt(total);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.upload = in.readInt();
        this.down = in.readInt();
        this.total = in.readInt();
    }

    @Override
    public String toString() {
        return "统计结果{" +
                "上传流量=" + upload +
                ", 下载流量=" + down +
                ",  上传下载总流量=" + total +
                '}';
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        AccessLogWritable accessLogWritable = (AccessLogWritable) o;
        return Objects.equals(upload, accessLogWritable.upload) &&
                Objects.equals(down, accessLogWritable.down) &&
                Objects.equals(total, accessLogWritable.total);
    }

    @Override
    public int hashCode() {
        return Objects.hash(upload, down, total);
    }

    public Integer getUpload() {
        return upload;
    }

    public void setUpload(Integer upload) {
        this.upload = upload;
    }

    public Integer getDown() {
        return down;
    }

    public void setDown(Integer down) {
        this.down = down;
    }

    public Integer getTotal() {
        return total;
    }

    public void setTotal(Integer total) {
        this.total = total;
    }

    public AccessLogWritable() {
    }

    public AccessLogWritable(Integer upload, Integer down, Integer total) {
        this.upload = upload;
        this.down = down;
        this.total = total;
    }

    public AccessLogWritable(Integer upload, Integer down) {
        this.upload = upload;
        this.down = down;
    }
}

注意:write的顺序和read的顺序必须严格一致,读的类型和写的类型也必须完全一致

开发Job作业

public class AccessLogCustomerTypeJob extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
        ToolRunner.run(new AccessLogCustomerTypeJob(),args);
    }
    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(getConf(), "customer-type-job");
        job.setJarByClass(AccessLogCustomerTypeJob.class);

        //设置input format
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("/accesslog/access.log"));

        //设置map
        job.setMapperClass(AccessLogCustomerTypeMap.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(AccessLogWritable.class);

        //shuffle 无须设置 自动处理

        //设置reduce
        job.setReducerClass(AccessLogCustomerTypeReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(AccessLogWritable.class);

        //设置Output Format
        job.setOutputFormatClass(TextOutputFormat.class);
        Path res = new Path("/accesslog/res2");
        FileSystem fileSystem = FileSystem.get(getConf());
        if(fileSystem.exists(res)){
            fileSystem.delete(res,true);
        }
        TextOutputFormat.setOutputPath(job, res);

        //提交作业
        boolean status = job.waitForCompletion(true);
        System.out.println("作业执行状态:" + status);
        return 0;
    }


    public static class AccessLogCustomerTypeMap extends Mapper<LongWritable, Text,Text, AccessLogWritable>{

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] values = value.toString().split("\t");
            int upload = Integer.valueOf(values[values.length-3]);
            int down = Integer.valueOf(values[values.length-2]);
            context.write(new Text(values[1]),new AccessLogWritable(upload,down,0));
        }
    }

    public static class AccessLogCustomerTypeReduce extends Reducer<Text, AccessLogWritable,Text, AccessLogWritable>{
        @Override
        protected void reduce(Text key, Iterable<AccessLogWritable> values, Context context) throws IOException, InterruptedException {
            int upload =0;
            int down = 0;
            for (AccessLogWritable value : values) {
                upload += value.getUpload();
                down  += value.getDown();
            }
            context.write(key,new AccessLogWritable(upload,down,upload+down));
        }
    }
}

执行job作业

[root@hadoop5 ~]# yarn jar hadoop_wordcount-1.0-SNAPSHOT.jar
19/12/20 10:33:57 INFO client.RMProxy: Connecting to ResourceManager at hadoop6/10.15.0.6:8032
19/12/20 10:34:00 INFO input.FileInputFormat: Total input files to process : 1
19/12/20 10:34:00 INFO mapreduce.JobSubmitter: number of splits:1
19/12/20 10:34:01 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
19/12/20 10:34:02 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1576780523481_0013
19/12/20 10:34:03 INFO impl.YarnClientImpl: Submitted application application_1576780523481_0013
19/12/20 10:34:03 INFO mapreduce.Job: The url to track the job: http://hadoop6:8088/proxy/application_1576780523481_0013/
19/12/20 10:34:03 INFO mapreduce.Job: Running job: job_1576780523481_0013
19/12/20 10:34:27 INFO mapreduce.Job: Job job_1576780523481_0013 running in uber mode : false
19/12/20 10:34:27 INFO mapreduce.Job:  map 0% reduce 0%
19/12/20 10:34:43 INFO mapreduce.Job:  map 100% reduce 0%
19/12/20 10:35:00 INFO mapreduce.Job:  map 100% reduce 100%
19/12/20 10:35:02 INFO mapreduce.Job: Job job_1576780523481_0013 completed successfully
image-20191221231217047

查看结果

[root@hadoop5 ~]#  hdfs dfs -text /accesslog/res2/part-r-00000
13726230503 统计结果{上传流量=4962, 下载流量=49362,  上传下载总流量=54324}
13826544101 统计结果{上传流量=528, 下载流量=0,  上传下载总流量=528}
13926251106 统计结果{上传流量=480, 下载流量=0,  上传下载总流量=480}
13926435656 统计结果{上传流量=264, 下载流量=3024,  上传下载总流量=3288}
image-20191221231328361

十. MapReduce的高级特性

10.1 MapRedcuce的数据清洗

10.1.1 数据清洗

所谓数据清洗指的是在复杂的数据格式中获取我们需要的数据过程称之为数据清洗,整个过程仅仅是将复杂数据中我们需要的数据清洗出来,不涉及任何的统计计算工作,如下图展示过程就是数据清洗:

image-20191222105730220

10.1.2 数据清洗编程思路分析

image-20191222111700751

10.1.3 开发数据清洗

//开发数据清洗
public class DataCleanAccessLogJob extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new DataCleanAccessLogJob(),args);
    }

    @Override
    public int run(String[] args) throws Exception {
        //创建job作业
        Job job = Job.getInstance(getConf(), "data-clean-access-log-job");
        job.setJarByClass(DataCleanAccessLogJob.class);
        //设置input format
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("/accesslog/access.log"));

        //设置map
        job.setMapperClass(DataCleanAccessLogMap.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        //跳过shuffle 和 reduce
        job.setNumReduceTasks(0);

        //设置output format
        job.setOutputFormatClass(TextOutputFormat.class);
        Path res = new Path("/accesslog/cleandata");
        FileSystem fileSystem = FileSystem.get(getConf());
        if(fileSystem.exists(res)){
            fileSystem.delete(res,true);
        }
        TextOutputFormat.setOutputPath(job,res);


        //提交job
        boolean status = job.waitForCompletion(true);
        System.out.println("作业提交状态 = " + status);
        return 0;
    }

     //map阶段
     public  static class DataCleanAccessLogMap extends Mapper<LongWritable, Text,Text, NullWritable>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] values = value.toString().split("\t");
            Text keyout = new Text(values[1]+"\t"+values[6] + "\t" + values[7]);
            context.write(keyout, NullWritable.get());
        }
    }
    //没有reduce阶段
}

注意: 设置job.setNumReduceTasks(0);这句话本次的mapreduce就跳过了reduce阶段的执行

10.1.4 运行作业

[root@hadoop5 ~]# yarn jar hadoop_wordcount-1.0-SNAPSHOT.jar
19/12/20 13:22:03 INFO client.RMProxy: Connecting to ResourceManager at hadoop6/10.15.0.6:8032
19/12/20 13:22:07 INFO input.FileInputFormat: Total input files to process : 1
19/12/20 13:22:07 INFO mapreduce.JobSubmitter: number of splits:1
19/12/20 13:22:08 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
19/12/20 13:22:09 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1576780523481_0018
19/12/20 13:22:10 INFO impl.YarnClientImpl: Submitted application application_1576780523481_0018
19/12/20 13:22:10 INFO mapreduce.Job: The url to track the job: http://hadoop6:8088/proxy/application_1576780523481_0018/
19/12/20 13:22:10 INFO mapreduce.Job: Running job: job_1576780523481_0018
19/12/20 13:22:36 INFO mapreduce.Job: Job job_1576780523481_0018 running in uber mode : false
19/12/20 13:22:36 INFO mapreduce.Job:  map 0% reduce 0%
19/12/20 13:22:51 INFO mapreduce.Job:  map 100% reduce 0%
19/12/20 13:22:53 INFO mapreduce.Job: Job job_1576780523481_0018 completed successfully
image-20191222113540419

10.1.5 查看运行结果

image-20191222113701804
[root@hadoop5 ~]# hdfs dfs -text /accesslog/cleandata/part*
13726230503 2481    24681
13826544101 264 0
13926435656 132 1512
13926251106 240 0
13726230503 2481    24681
13826544101 264 0
13926435656 132 1512
13926251106 240 0
image-20191222113759032

十一. MapReduce的高级特性

11.1 MapReduce中Map的数量

# MapReduce运行过程中Map的数量是由block所决定的: 
        也就是一个文件分为几个block就是几个map

注意:map的数量由block块决定,也就意味着一旦文件确定根据默认配置划分block也将确定,所以我们没有办法在程序中手动干预map的执行数量

11.2 MapReduce中Reduce的数量

# Reduce的数量是可以在程序中手动指定
        默认数量为:  1个 Reduce
        可以通过:    job.setNumReduceTasks(0);  0 就是没有   数字是几就是几个

11.2.1 通过修改word count案例reduce数量比较不同

使用默认reduce数量也就是1的执行结果:

image-20191222121049721
image-20191222121133848

将reduce数量修改为多个这里修改为2个,运行查看结果

                //省略............
                //设置reduce 阶段
        job.setReducerClass(WordCountReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //设置reduce数量
        job.setNumReduceTasks(2);
                //省略..........

运行job作业

image-20191222121658125

查看结果

image-20191222121732060
image-20191222121849990

通过这里可以总结出:有几个reduce就会生成几个结果文件,多个reduce同时处理数据将原来一个reduce处理结果,分到了不同的reduce处理文件中,因此如果日后需要将所有结果数据汇总在一起之能设置一个reduce,如果想要将结果划分到多个文件中可以设置多个reduce数量

11.2.2 为什么要设置多个reduce数量?

# 1.提高MR的运行效率,从而快速统计计算工作

11.2.3 多个Reduce如何去分配map中数据?

一旦设置了多个reduce,如何让多个reduce均分map中统计的数据,这里面还有一个分区(Partition)概念,一个Reduce会形成一个分区,默认使用的是HashPartitioner会根据map输出key做hash运算去决定map中输出数据交给那个reduce处理

@InterfaceAudience.Public
@InterfaceStability.Stable
public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {
  public void configure(JobConf job) {}
  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K2 key, V2 value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }
}

image-20191222130057898

11.2.4 如何自定义分区(Partitoner)

自定义分区:可以根据业务规则将统计结果划分到不同分区中

数据格式

1363157985066   13726230503 00-FD-07-A4-72-B8:CMCC  120.196.100.82  24  27  2481    24681   200
1363157995052   13826544101 5C-0E-8B-C7-F1-E0:CMCC  120.197.40.4    4   0   264 0   200
1363157991076   13926435656 20-10-7A-28-CC-0A:CMCC  120.196.100.99  2   4   132 1512    200
1363154400022   13926251106 5C-0E-8B-8B-B1-50:CMCC  120.197.40.4    4   0   240 0   200
1363157985066   13726230503 00-FD-07-A4-72-B8:CMCC  120.196.100.82  24  27  2481    24681   200
1363157995052   13826544101 5C-0E-8B-C7-F1-E0:CMCC  120.197.40.4    4   0   264 0   200
1363157991076   13926435656 20-10-7A-28-CC-0A:CMCC  120.196.100.99  2   4   132 1512    200
1363154400022   13926251106 5C-0E-8B-8B-B1-50:CMCC  120.197.40.4    4   0   240 0   200

要求

# 统计流量,并将不同省份数据的统计结果放在不同文件中

自定义分区

//自定义分区 输入数据map端结果
public class ProvincePartitioner extends Partitioner<Text,AccessLogWritable> {
    //根据业务规则将不同省份结果划分到不同分区
    private static  HashMap<String,Integer> provincePartitioners =  new HashMap<>();
    static{
        provincePartitioners.put("136",0);
        provincePartitioners.put("137",1);
        provincePartitioners.put("138",2);
        provincePartitioners.put("139",3);
    }

    // 返回分区号给那个reduce
    @Override
    public int getPartition(Text key, AccessLogWritable accessLogWritable, int numPartitions) {
        String keyPrefix = key.toString().substring(0, 3);
        Integer partionId = provincePartitioners.get(keyPrefix);
        return partionId ==null?4: partionId;
    }
}

在job作业中指定分区

//设置分区
job.setPartitionerClass(ProvincePartitioner.class);
//设置reduce数量
job.setNumReduceTasks(5);

运行job作业

image-20191222140811595

查看结果

image-20191222140714981
image-20191222140953746

11.3 计数器(Counter)

计数器:顾名思义就是用来对map运行数量和reduce运行数量进行统计的

11.3.1 在map中使用计数器

public static class AccessLogCustomerTypeMap extends Mapper<LongWritable, Text,Text, AccessLogWritable>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //计数器
            Counter access_map = context.getCounter("map-group", "access_map");
            access_map.increment(1);
            String[] values = value.toString().split("\t");
            int upload = Integer.valueOf(values[values.length-3]);
            int down = Integer.valueOf(values[values.length-2]);
            context.write(new Text(values[1]),new AccessLogWritable(upload,down,0));
        }
    }

11.3.2 在reduce中使用计数器

public static class AccessLogCustomerTypeReduce extends Reducer<Text, AccessLogWritable,Text, AccessLogWritable>{
        @Override
        protected void reduce(Text key, Iterable<AccessLogWritable> values, Context context) throws IOException, InterruptedException {
            //计数器
            Counter access_map = context.getCounter("reduce-group", "access_reduce");
            access_map.increment(1);
            int upload =0;
            int down = 0;
            for (AccessLogWritable value : values) {
                upload += value.getUpload();
                down  += value.getDown();
            }
            context.write(key,new AccessLogWritable(upload,down,upload+down));
        }
    }
image-20191222155105711

11.4 Combiner 合并

Combiner合并:又称之为map端的reduce,主要是通过对map局部的数据先进行一次reduce,从而来减少map端输出数据频繁发送给Reduce处理时所带来的网络压力问题。通过这种提前对map输出做一次局部reduce,这样既可以减轻网络压力,又能提高效率。在mapreduce编程模型中默认是关闭的。

11.4.1 开启Combiner

.....................               
//设置map
job.setMapperClass(AccessLogCustomerTypeMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(AccessLogWritable.class);

//shuffle 无须设置 自动处理

//设置Combiner
job.setCombinerClass(AccessLogCustomerTypeReduce.class);

//设置分区
job.setPartitionerClass(ProvincePartitioner.class);
...............

11.4.2 运行设置Combiner的job

没有设置Combiner时:

image-20191222160857054

设置Combiner时:

image-20191222160936568

十二. Job作业原理分析

12.1 Input Format 原理解析

12.1.1 类图和源码

查看Text InputFormat的类图

image-20191222163038681

注意:通过类图发现最顶层父类为Input Format这个类

查看Input Formt 源码:

image-20191222163205033
/** 
   * Logically split the set of input files for the job.  
   * 
   * <p>Each {@link InputSplit} is then assigned to an individual {@link Mapper}
   * for processing.</p>
   *
   * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
   * input files are not physically split into chunks. For e.g. a split could
   * be <i>&lt;input-file-path, start, offset&gt;</i> tuple. The InputFormat
   * also creates the {@link RecordReader} to read the {@link InputSplit}.
   * 
   * @param context job configuration.
   * @return an array of {@link InputSplit}s for the job.
   */
  public abstract 
    List<InputSplit> getSplits(JobContext context
                               ) throws IOException, InterruptedException;
  
  /**
   * Create a record reader for a given split. The framework will call
   * {@link RecordReader#initialize(InputSplit, TaskAttemptContext)} before
   * the split is used.
   * @param split the split to be read
   * @param context the information about the task
   * @return a new record reader
   * @throws IOException
   * @throws InterruptedException
   */
  public abstract 
    RecordReader<K,V> createRecordReader(InputSplit split,
                                         TaskAttemptContext context
                                        ) throws IOException, 
                                                 InterruptedException;
  • getSplits方法:是用来根据block计算逻辑上的切片,每一个逻辑切片对应一个map操作
  • createRecordReader方法:用来将切片数据封装成一行(LongWritable,Text)

12.1.2 Input Format切片的计算方式

image-20191222165455715
image-20191228151116890

说明:

1. 当一个文件小于128M时一定会被分成一个逻辑切片,Block块与Split(切片)一一对应。

2.当一个文件大于128M,剩余大小大于切片的1.1倍,Block块与Split(切片)一一对应。反之如果一个文件大于128M,剩余大小小于切片的1.1倍,此时将划分为一个切片。

image-20191222170938132

12.1.3 将数据封装成key和value

image-20191222172100874

image-20191222172239717


12.2 Map源码原理分析

这里只给出了核心方法:

 /**
   * Called once for each key/value pair in the input split. Most applications
   * should override this, but the default is the identity function.
   */
  @SuppressWarnings("unchecked")
  protected void map(KEYIN key, VALUEIN value, 
                     Context context) throws IOException, InterruptedException {
    context.write((KEYOUT) key, (VALUEOUT) value);
  }

  /**
   * Called once at the end of the task.
   */
  protected void cleanup(Context context
                         ) throws IOException, InterruptedException {
    // NOTHING
  }
  
  /**
   * Expert users can override this method for more complete control over the
   * execution of the Mapper.
   * @param context
   * @throws IOException
   */
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
      while (context.nextKeyValue()) {
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    } finally {
      cleanup(context);
    }
  }
image-20191222172609418

12.3 Reduce源码分析

查看reduce的源码,这里只罗列核心代码:

**
   * This method is called once for each key. Most applications will define
   * their reduce class by overriding this method. The default implementation
   * is an identity function.
   */
  @SuppressWarnings("unchecked")
  protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
                        ) throws IOException, InterruptedException {
    for(VALUEIN value: values) {
      context.write((KEYOUT) key, (VALUEOUT) value);
    }
  }

  /**
   * Called once at the end of the task.
   */
  protected void cleanup(Context context
                         ) throws IOException, InterruptedException {
    // NOTHING
  }

  /**
   * Advanced application writers can use the 
   * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
   * control how the reduce task works.
   */
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
      while (context.nextKey()) {
        reduce(context.getCurrentKey(), context.getValues(), context);
        // If a back up store is used, reset it
        Iterator<VALUEIN> iter = context.getValues().iterator();
        if(iter instanceof ReduceContext.ValueIterator) {
          ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        
        }
      }
    } finally {
      cleanup(context);
    }
  }
}
image-20191228141029002

12.4 OutputFormat 源码分析

查看源类图的结构:

image-20191228141731357

注意:和 InputFormat基本是类似的,最顶层父类为Output Format:

TextOutputFormat部分源码如下:

/** An {@link OutputFormat} that writes plain text files. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {
  public static String SEPERATOR = "mapreduce.output.textoutputformat.separator";
  protected static class LineRecordWriter<K, V>
    extends RecordWriter<K, V> {
    private static final String utf8 = "UTF-8";
    private static final byte[] newline;
    static {
      try {
        newline = "\n".getBytes(utf8);
      } catch (UnsupportedEncodingException uee) {
        throw new IllegalArgumentException("can't find " + utf8 + " encoding");
      }
    }

    protected DataOutputStream out;
    private final byte[] keyValueSeparator;

    public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
      this.out = out;
      try {
        this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
      } catch (UnsupportedEncodingException uee) {
        throw new IllegalArgumentException("can't find " + utf8 + " encoding");
      }
    }

    public LineRecordWriter(DataOutputStream out) {
      this(out, "\t");
    }
  }
image-20191228142300615

通过源码得知默认的key和value输出分割符为"tab键"也就是这里的"\t"

12.5 Shuffle的分析

shuffle阶段是一个整体叫法: 其实又分为Map端的shuffleReduce端的Shuffle

12.5.1 Map端shuffle

image-20191228200603195

12.5.2 Reduce端的shuffle

image-20191228200805109

12.6 MapReduce整体运行原理

image-20191228201000469
#1.计算切片
    有几个切片就有几个map task
#2.环形缓存区
    经过map函数的逻辑处理后的数据输出之后,会通过OutPutCollector收集器将数据收集到环形缓存区保存。
    环形缓存区的大小默认为100M,当保存的数据达到80%时,就将缓存区的数据溢出到磁盘上保存。
#3.溢出
    环形缓存区的数据达到其容量的80%时就会溢出到磁盘上进行保存,在此过程中,程序会对数据进行分区(默认HashPartition)和排序(默认根据key进行快排)
    缓存区不断溢出的数据形成多个小文件
#4.合并
    溢出的多个小文件各个区合并在一起(0区和0区合并成一个0区),形成大文件
    通过归并排序保证区内的数据有序
#5.shuffle
    从过程2到过程7之间,即map任务和reduce任务之间的数据流称为shuffle(混洗),而过程5最能体现出混洗这一概念。一般情况下,一个reduce任务的输入数据来自与多个map任务,多个reduce任务的情况下就会出现如过程5所示的,
    每个reduce任务从map的输出数据中获取属于自己的那个分区的数据。
#6.合并
    运行reducetask的节点通过过程5,将来自多个map任务的属于自己的分区数据下载到本地磁盘工作目录。这多个分区文件通过归并排序合并成大文件,并根据key值分好组(key值相同的,value值会以迭代器的形式组在一起)。
#7.reducetask
    reducetask从本地工作目录获取已经分好组并且排好序的数据,将数据进行reduce函数中的逻辑处理。
#8.输出
    每个reducetask输出一个结果文件。

十三. MapReduce与Yarn

13.1 Job作业提交过程

img
客户端的配置信息mapreduce.framework.name为yarn时,客户端会启动YarnRunner(yarn的客户端程序),并将mapreduce作业提交给yarn平台处理。

1.向ResourceManager请求运行一个mapreduce程序。

2.ResourceManager返回hdfs地址,告诉客户端将作业运行相关的资源文件上传到hdfs。

3.客户端提交mr程序运行所需的文件(包括作业的jar包,作业的配置文件,分片信息等)到hdfs上。

4.作业相关信息提交完成后,客户端用过调用ResourcrManager的submitApplication()方法提交作业。

5.ResourceManager将作业传递给调度器,调度器的默认调度策略是先进先出。

6.调度器寻找一台空闲的节点,并在该节点隔离出一个容器(container),容器中分配了cpu,内存等资源,并启动MRAppmaster进程。

7.MRAppmaster根据需要运行多少个map任务,多少个reduce任务向ResourceManager请求资源。

8.ResourceManager分配相应数量的容器,并告知MRAppmaster容器在哪。

9.MRAppmaster启动maptask。

10.maptask从HDFS获取分片数据执行map逻辑。

11.map逻辑执行结束后,MRAppmaster启动reducetask。

12.reducetask从maptask获取属于自己的分区数据执行reduce逻辑。

13.reduce逻辑结束后将结果数据保存到HDFS上。

14.mapreduce作业结束后,MRAppmaster通知ResourceManager结束自己,让ResourceManager回收所有资源。

十四. HA的hadoop集群搭建

14.1 集群规划

# 集群规划
    10.15.0.20    zk     zknodes  通过一个节点充当整个集群
    
    10.15.0.22    hadoop22    NameNode (active)  & ZKFC
    10.15.0.23    hadoop23    NameNode (standby) & ZKFC
    10.15.0.24    hadoop24    ResourceManager(active) 
    10.15.0.25    hadoop25    ResourceManager(standby)
    10.15.0.26    hadoop26    DataNode & JournalNode  & NodeManager
    10.15.0.27    hadoop27    DataNode & JournalNode  & NodeManager
    10.15.0.28    hadoop28    DataNode & JournalNode  & NodeManager

# 克隆机器做准备工作:
    0.修改ip地址为上述ip
    1.修改主机名/etc/hostsname为上述对应主机名  修改完必须重新启动
    2.配置主机名ip地址映射/etc/hosts文件并同步所有节点
    10.15.0.20 zk
    10.15.0.22 hadoop22
    10.15.0.23 hadoop23
    10.15.0.24 hadoop24
    10.15.0.25 hadoop25
    10.15.0.26 hadoop26
    10.15.0.27 hadoop27
    10.15.0.28 hadoop28
    3.所有节点安装jdk并配置环境变量
    4.关闭所有机器的网络防火墙配置
        systemctl stop firewalld
        systemctl disable firewalld
    5.所有节点安装centos7.x搭建集群的依赖
        yum install psmisc -y
    6.配置ssh免密登录
        hadoop22 生成ssh-keygen  然后ssh-copy-id 到 hadoop22~~hadoop28 上每一个节点
        hadoop23 生成ssh-keygen  然后ssh-copy-id 到 hadoop22~~hadoop28 上每一个节点
        hadoop24 生成ssh-keygen  然后ssh-copy-id 到 hadoop22~~hadoop28 上每一个节点
        hadoop25 生成ssh-keygen  然后ssh-copy-id 到 hadoop22~~hadoop28 上每一个节点

14.2 搭建zk集群

# 1.安装zk安装包
    [root@zk ~]# tar -zxvf zookeeper-3.4.12.tar.gz

# 2.准备zk的数据文件夹
    [root@zk ~]# mkdir zkdata1 zkdata2 zkdata3

# 3.在每个数据文件夹中准备集群唯一标识文件myid
    [root@zk ~]# echo "1" >> zkdata1/myid
    [root@zk ~]# echo "2" >> zkdata2/myid 
    [root@zk ~]# echo "3" >> zkdata3/myid 

# 4.在每个数据文件夹中准备zk的配置文件zoo.cfg
 
    [root@zk ~]# vim /root/zkdata1/zoo.cfg  
    tickTime=2000
    initLimit=10
    syncLimit=5
    dataDir=/root/zkdata1
    clientPort=3001
    server.1=zk:3002:3003
    server.2=zk:4002:4003
    server.3=zk:5002:5003

  [root@zk ~]# vim /root/zkdata2/zoo.cfg  
      tickTime=2000
    initLimit=10
    syncLimit=5
    dataDir=/root/zkdata2
    clientPort=4001
    server.1=zk:3002:3003
    server.2=zk:4002:4003
    server.3=zk:5002:5003

    [root@zk ~]# vim /root/zkdata3/zoo.cfg  
      tickTime=2000
    initLimit=10
    syncLimit=5
    dataDir=/root/zkdata3
    clientPort=5001
    server.1=zk:3002:3003
    server.2=zk:4002:4003
    server.3=zk:5002:5003

# 5.进入zk安装目录bin目录执行如下命令启动zk集群:
    [root@zk bin]# ./zkServer.sh start /root/zkdata1/zoo.cfg
    [root@zk bin]# ./zkServer.sh start /root/zkdata2/zoo.cfg
    [root@zk bin]# ./zkServer.sh start /root/zkdata3/zoo.cfg

# 6.进入zk安装目录bin目录执行如下命令查看集群状态
        ./zkServer.sh status /root/zkdata1/zoo.cfg
        ./zkServer.sh status /root/zkdata2/zoo.cfg
        ./zkServer.sh status /root/zkdata3/zoo.cfg

14.3 搭建hadoop的HA集群

# 1.在hadoop22--hadoop28上安装hadoop安装包
       tar -zxf hadoop-2.9.2.tar.gz 
# 2.在hadoop22--hadoop28机器上配置hadoop环境变量

# 3.在hadoop22节点上配置hadoop-env.sh文件
        export JAVA_HOME=/usr/java/jdk1.8.0_171-amd64
# 4.在hadoop22节点上配置core-site.xml文件
        [root@hadoop22 ~]# vim hadoop-2.9.2/etc/hadoop/core-site.xml 
<!--hdfs主要入口不再是一个具体机器而是一个虚拟的名称 -->
<property>
  <name>fs.defaultFS</name>
  <value>hdfs://ns</value>
</property>
<!-- hadoop临时目录位置 -->
<property>
  <name>hadoop.tmp.dir</name>
  <value>/root/hadoop-2.9.2/data</value>
</property>
<!--zk集群的所有节点-->
<property>
    <name>ha.zookeeper.quorum</name>
  <value>zk:3001,zk:4001,zk:5001</value>
</property>
# 5.在hadoop2节点上配置hdfs-site.xml文件 
    [root@hadoop22 ~]# vim hadoop-2.9.2/etc/hadoop/hdfs-site.xml 
<!--指定hdfs的nameservice为ns,需要和core-site.xml中的保持一致 -->
      <property>
          <name>dfs.nameservices</name>
          <value>ns</value>
      </property>
      <!-- ns下面有两个NameNode,分别是nn1,nn2 -->
      <property>
          <name>dfs.ha.namenodes.ns</name>
          <value>nn1,nn2</value>
      </property>
        <!-- nn1的RPC通信地址 -->
      <property>
          <name>dfs.namenode.rpc-address.ns.nn1</name>
          <value>hadoop22:9000</value>
      </property>
      <!-- nn1的http通信地址 -->
      <property>
          <name>dfs.namenode.http-address.ns.nn1</name>
          <value>hadoop22:50070</value>
      </property>
    <!-- nn2的RPC通信地址 -->
      <property>
          <name>dfs.namenode.rpc-address.ns.nn2</name>
          <value>hadoop23:9000</value>
      </property>
      <!-- nn2的http通信地址 -->
      <property>
          <name>dfs.namenode.http-address.ns.nn2</name>
          <value>hadoop23:50070</value>
      </property>

    <!-- 指定NameNode的元数据在JournalNode上的存放位置 -->
    <property>
        <name>dfs.namenode.shared.edits.dir</name>
        <value>qjournal://hadoop26:8485;hadoop27:8485;hadoop28:8485/ns</value>
    </property>
<!-- 指定JournalNode在本地磁盘存放数据的位置 -->
    <property>
        <name>dfs.journalnode.edits.dir</name>
        <value>/root/journal</value>
    </property>
    <!-- 开启NameNode故障时自动切换 -->
    <property>
        <name>dfs.ha.automatic-failover.enabled</name>
        <value>true</value>
    </property>
    <!-- 配置失败自动切换实现方式 -->
    <property>
        <name>dfs.client.failover.proxy.provider.ns</name>
        <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
    </property>
    <!-- 配置隔离机制,如果ssh是默认22端口,value直接写sshfence即可 -->
    <property>
        <name>dfs.ha.fencing.methods</name>
        <value>sshfence</value>
    </property>
    <!-- 使用隔离机制时需要ssh免登陆 -->
    <property>
        <name>dfs.ha.fencing.ssh.private-key-files</name>
        <value>/root/.ssh/id_rsa</value>
    </property>
# 6.在hadoop2节点上配置yarn-site.xml文件
<!-- 开启RM高可用 -->
<property>
  <name>yarn.resourcemanager.ha.enabled</name>
  <value>true</value>
</property>
<!-- 指定RM的cluster id -->
<property>
  <name>yarn.resourcemanager.cluster-id</name>
  <value>yrc</value>
</property>
<!-- 指定RM的名字 -->
<property>
  <name>yarn.resourcemanager.ha.rm-ids</name>
  <value>rm1,rm2</value>
</property>
<!-- 分别指定RM的地址 -->
<property>
  <name>yarn.resourcemanager.hostname.rm1</name>
  <value>hadoop24</value>
</property>
<property>
  <name>yarn.resourcemanager.hostname.rm2</name>
  <value>hadoop25</value>
</property>
<property>
  <name>yarn.resourcemanager.webapp.address.rm1</name>
  <value>hadoop24:8088</value>
</property>
<property>
  <name>yarn.resourcemanager.webapp.address.rm2</name>
  <value>hadoop25:8088</value>
</property>
<!-- 指定zk集群地址 -->
<property>
  <name>yarn.resourcemanager.zk-address</name>
  <value>zk:3001,zk:4001,zk:5001</value>
</property>
<property>
  <name>yarn.nodemanager.aux-services</name>
  <value>mapreduce_shuffle</value>
</property>
# 7.在hadoop2节点上配置mapred-site.xml文件,默认不存在需要复制
    [root@hadoop22 ~]# cp hadoop-2.9.2/etc/hadoop/mapred-site.xml.template  hadoop-2.9.2/etc/hadoop/mapred-site.xml
    [root@hadoop22 ~]# vim hadoop-2.9.2/etc/hadoop/mapred-site.xml
<!-- 指定mr框架为yarn方式 -->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>

# 8.在hadoop2节点上配置slaves文件
    [root@hadoop22 ~]# vim hadoop-2.9.2/etc/hadoop/slaves 
    hadoop26
    hadoop27
    hadoop28

# 9.同步集群配置文件
    scp -r hadoop-2.9.2/etc/hadoop/ hadoop23:/root/hadoop-2.9.2/etc/
    scp -r hadoop-2.9.2/etc/hadoop/ hadoop24:/root/hadoop-2.9.2/etc/
    scp -r hadoop-2.9.2/etc/hadoop/ hadoop25:/root/hadoop-2.9.2/etc/
    scp -r hadoop-2.9.2/etc/hadoop/ hadoop26:/root/hadoop-2.9.2/etc/
    scp -r hadoop-2.9.2/etc/hadoop/ hadoop27:/root/hadoop-2.9.2/etc/
    scp -r hadoop-2.9.2/etc/hadoop/ hadoop28:/root/hadoop-2.9.2/etc/

# 10.启动HDFS的高可用
    1.在任意NameNode上格式化ZK
        hdfs zkfc -formatZK

    2.在hadoop26 hadoop27 hadoop28启动journal node
        [root@hadoop26 ~]# hadoop-daemon.sh start journalnode
        [root@hadoop27 ~]# hadoop-daemon.sh start journalnode
        [root@hadoop28 ~]# hadoop-daemon.sh start journalnode

    3.在活跃的NameNode节点上执行格式化
        [root@hadoop22 ~]# hdfs namenode -format ns

    4.在NameNode上启动hdfs集群
        [root@hadoop22 ~]# start-dfs.sh

    5.在standby的NameNode上执行
        [root@hadoop23 ~]# hdfs namenode -bootstrapStandby

    6.在standby的NameNode执行
        [root@hadoop23 ~]# hadoop-daemon.sh start namenode

# 11.在活跃节点上启动yarn集群

    1.在活跃的resourcemang节点上执行
            [root@hadoop24 ~]# start-yarn.sh
    2.在standby的节点上执行
        [root@hadoop25 ~]# yarn-daemon.sh start resourcemanager

# 12.测试集群
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,132评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,802评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,566评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,858评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,867评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,695评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,064评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,705评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,915评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,677评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,796评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,432评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,041评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,992评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,223评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,185评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,535评论 2 343