举例理解MapReduce—排序

例子实现目标

该代码实现的是在输入的数据对中,先以第一列由小到大排序,如果第一列值相等,以第二列由小到大排序。即:
添加cp.txt文件到input文件夹

$vim cp.txt
$hadoop fs -put cp.txt /input/

5,1
3,2
1,3
4,3
2,3
1,4
1,2
2,5

输出结果

1,2
1,3
1,4
2,3
2,5,
3,2
4,3
5,1

附图:

image.png

实践例子

1.终端执行>start-all.sh
2.input文件夹下增加cp.txt文件
3.打开eclipse
4.新建mapreduce项目,新建包(命名mr),新建类(命名MySortClass )类代码如下:
5.右键,选择run as hadoop
6.右键refresh一下hadoop文件,成功后output下会出现成功排序的结果文件

package mr;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import mr.MyWordCount.MyMapper;
import mr.MyWordCount.MyReduce;

public class MySortClass {
    
    static class MySortMapper  extends  Mapper<LongWritable, Text, A, NullWritable>{  
        
         public void map(LongWritable k1, Text v1, Context context) 
                         throws java.io.IOException, java.lang.InterruptedException
         {
            String[]  lines= v1.toString().split(",");
             
                A  a1=new A(Long.parseLong(lines[0]),Long.parseLong(lines[1]));
             
            context.write(a1, NullWritable.get());
            System.out.println("map......");
         }
        
    }
    
    static class  MySortReduce extends Reducer<A, NullWritable, Text, Text>{
         public void reduce(A k2, Iterable<NullWritable> v2, Context context) throws java.io.IOException, java.lang.InterruptedException
         {
              
              
             context.write(new Text(new Long(k2.firstNum).toString()), new Text(new Long(k2.secondNum).toString()));    
             
             System.out.println("reduce......");
         }
            
    }
    
    private static class  A implements WritableComparable<A> {
        long firstNum;
        long secondNum;
 
        public A() {
        }
 
        public A(long first, long second) {
            firstNum = first;
            secondNum = second;
        }
 
       
        public void write(DataOutput out) throws IOException {
            out.writeLong(firstNum);
            out.writeLong(secondNum);
        }
    
        public void readFields(DataInput in) throws IOException {
            firstNum = in.readLong();
            secondNum = in.readLong();
        }
 
        /*
         * 当key进行排序时会调用以下这个compreTo方法
         */
        @Override
        public int compareTo(A anotherKey) {
            long min = firstNum - anotherKey.firstNum;
            if (min != 0) {
                // 说明第一列不相等,则返回两数之间小的数
                return (int) min;
            } else {
                return (int) (secondNum - anotherKey.secondNum);
            }
        }
    }
    private static String INPUT_PATH="hdfs://master:9000/input/cp.txt";
    private static String OUTPUT_PATH="hdfs://master:9000/output/c/";

    public static void main(String[] args) throws  Exception {
        Configuration  conf=new Configuration();
        FileSystem  fs=FileSystem.get(new URI(OUTPUT_PATH),conf);
     
        if(fs.exists(new Path(OUTPUT_PATH)))
                fs.delete(new Path(OUTPUT_PATH));
        
        Job  job=new Job(conf,"myjob");
        
        job.setJarByClass(MySortClass.class);
        job.setMapperClass(MySortMapper.class);
        job.setReducerClass(MySortReduce.class);
        //job.setCombinerClass(MySortReduce.class);
         
    
        job.setMapOutputKeyClass(A.class);
        job.setMapOutputValueClass(NullWritable.class); 
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

    /*如果map和reduce的<key,value>类型是一样的,
则仅设置job.setOutputKeyClass();job.setOutputValueClass();即可*/

        FileInputFormat.addInputPath(job,new Path(INPUT_PATH));
        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
        
        job.waitForCompletion(true);

    }

}

部分代码理解

类A实现了WritableComparable,设置了两个属性firstNum; secondNum;

String[] lines= v1.toString().split(",");

读取一行(5,1)以逗号分隔,两个元素(5)(1)存入数组lines

A a1=new A(Long.parseLong(lines[0]),Long.parseLong(lines[1]));

Long.parseLong(lines[0])将string类型的“5”转化为long类型,a1.firstNum=5;a1.secondNum=1;

context.write(a1, NullWritable.get());

写入上下文,设置map的输出为<key,空>,不能使用new NullWritable()来定义,获取空值只能NullWritable.get()来获取

context.write(new Text(new Long(k2.firstNum).toString()), new Text(new Long(k2.secondNum).toString()));

reduce生成新的键值对,如:将<(5,1),null>转化为<5,1>

以map->reduce集群处理流程理解该例子(假设文件庞大)

1.首先对输入文件分片(inputSplit),假设分片大小为三行,那么分为三片:

5,1
3,2
1,3

4,3
2,3
1,4

1,2
2,5

2.三片交由三个map进程处理,生成键值对<a1,null>,为减少带宽负荷,在本地节点上做了排序,分区(partitioner,数据做了分区标记)输出结果:
(如果有需要在分区之前还可以进行combiner(本地reduce操作,详情请见文章《了解MapReduce》底部对combiner的解释),这里分区之前不需要combiner)

<(1,3),null>
<(3,2),null>
<(5,1),null>

<(1,4),null>
<(2,3),null>
<(4,3),null>

<(1,2),null>
<(2,5),null>

  1. 然后就是所有节点洗牌(shuffle),将各个节点上同个分区的数据放置到一个节点中,放置过去后做了排序:

<(1,2),null>
<(1,3),null>
<(1,4),null>

<(2,3),null>
<(2,5),null>

<(3,2),null>

<(4,3),null>

<(5,1),null>

  1. 最后就是reduce,生成新键值对并生成最后排序结果

(1,2)
(1,3)
(1,4)
(2,3)
(2,5)
(3,2)
(4,3)
(5,1)

总的来说就是:map(本地)->combiner(本地)->partitioner(本地)->shuffle(集群)->reduce(新本地),各部分又还有细节操作,combiner和partitioner属于map阶段的,shuffle属于reduce阶段的。
附图理解:

Paste_Image.png
因为对hadoop源码还不是很熟悉,所以不能很好地解释代码,欢迎大家建议和指导。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,132评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,802评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,566评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,858评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,867评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,695评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,064评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,705评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,915评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,677评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,796评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,432评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,041评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,992评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,223评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,185评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,535评论 2 343

推荐阅读更多精彩内容