Hadoop 反转排序--Order Inversion

一、前言


在这篇文章里我们要讨论的是排序反转模式。这种模式利用MapReduce的排序(sorting)阶段,让一部分数据提前发送到reducer端以利于后续计算,如果你对MapReduce了解不多,我劝你读下去,因为我将展示给你如何使用排序(sorting)和partitioner来实现我们的目的,这将会大有益处。
做完整个示例后,我觉得反转排序和二次排序是一样的道理。

  • 如下所示文件:
    [hadoop@master OrderInversion]$ cat orderinversion.txt 
    java is a great language
    java is a programming language
    java is green fun language
    java is great
    programming with java is fun
    
    
  • 要对其进行相对词频统计
    [hadoop@master OrderInversion]$ hdfs dfs -cat test_output/part*
    (a,great)       0.125
    (a,is)  0.25
    (a,java)        0.25
    (a,language)    0.25
    (a,programming) 0.125
    (fun,green)     0.2
    (fun,is)        0.4
    (fun,java)      0.2
    (fun,language)  0.2
    (great,a)       0.2
    (great,is)      0.4
    (great,java)    0.2
    (great,language)        0.2
    (green,fun)     0.25
    (green,is)      0.25
    (green,java)    0.25
    (green,language)        0.25
    (is,a)  0.14285714285714285
    (is,fun)        0.14285714285714285
    (is,great)      0.14285714285714285
    (is,green)      0.07142857142857142
    (is,java)       0.35714285714285715
    (is,programming)        0.07142857142857142
    (is,with)       0.07142857142857142
    (java,a)        0.16666666666666666
    (java,fun)      0.08333333333333333
    (java,great)    0.08333333333333333
    (java,green)    0.08333333333333333
    (java,is)       0.4166666666666667
    (java,programming)      0.08333333333333333
    (java,with)     0.08333333333333333
    (language,a)    0.3333333333333333
    (language,fun)  0.16666666666666666
    (language,great)        0.16666666666666666
    (language,green)        0.16666666666666666
    (language,programming)  0.16666666666666666
    (programming,a) 0.2
    (programming,is)        0.2
    (programming,java)      0.2
    (programming,language)  0.2
    (programming,with)      0.2
    (with,is)       0.3333333333333333
    (with,java)     0.3333333333333333
    (with,programming)      0.3333333333333333
    [hadoop@master OrderInversion]$ cat orderinversion.txt 
    java is a great language
    java is a programming language
    java is green fun language
    java is great
    programming with java is fun
    [hadoop@master OrderInversion]$ 
    
    

二、 实现示例

  • 1、 WordPair.java
    修改WordPair类的compareTo方法,让发现 “*” 为右词的对象排到前列。

    [hadoop@master OrderInversion]$ cat src/com/orderinversion/WordPair.java 
    package com.orderinversion;
    
    import org.apache.hadoop.io.WritableComparable;
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    public class WordPair implements WritableComparable<WordPair> {
       private String word;
       private String neighbor;
       
       public WordPair() {
       }
    
       public void setWord(String word) {
           this.word = word;
       }
       public void setNeighbor(String neighbor) {
           this.neighbor = neighbor;
       }
    
       public String getWord() {
           return word;
       }
       public String getNeighbor() {
           return neighbor;
       }
    
       @Override
       public void write(DataOutput out) throws IOException {
           out.writeUTF(word);
           out.writeUTF(neighbor);
       }
       @Override
       public void readFields(DataInput in) throws IOException {
           word = in.readUTF();
           neighbor = in.readUTF();
       }
    
       @Override
       public int compareTo(WordPair other) {
            int ret = word.compareTo(other.getWord());
            if (ret != 0) {
                return ret;
            }
    
            if (neighbor.equals("*")) {
                return -1;
            }
            if (other.getNeighbor().equals("*")) {
                return 1;
            }
    
            return neighbor.compareTo(other.getNeighbor());
       }
    
       @Override
       public String toString(){
           return "(" + word + "," + neighbor + ")";
       }
    }
    
    
  • 2、 WordPairPartitoner.java
    用key的hashcode对reducer数取模,就把key分配到了不同的reducer,这就是shuffle过程。但我们的WordPair 对象包含2个词,计算整个对象的hashcode是行不通的。我们需要写一个自己的Partitioner, 它在选择将输出发送到哪个reducer的时候只考虑左边的词。

    [hadoop@master OrderInversion]$ cat src/com/orderinversion/WordPairPartitoner.java 
    package com.orderinversion;
    
    import org.apache.hadoop.mapreduce.Partitioner;
    
    public class WordPairPartitoner extends Partitioner<WordPair,Integer> {
        @Override
        public int getPartition(WordPair wordPair,Integer integer,int numPartitions) {
            return wordPair.getWord().hashCode() % numPartitions;
        }
    }
    
    
  • 3、 RelativeFrequencyMapper.java
    首先我们要对mapper做一些有别于配对方法的修改。在每次循环的最后,输出了某个词的所有的词对之后,输出一个特殊的词对(“word”,”*”), 计数就是这个词作为左边词的词对出现的次数。
    现在我们找到了统计特定词出现次数的办法,我们还需要想办法让这个特定的词对称为reduce处理的第一条记录以便计算相对频度。我们可以通过修改WordPair对象的compareTo方法在MapReduce 的sorting阶段来实现这个目的。

    [hadoop@master OrderInversion]$ cat src/com/orderinversion/RelativeFrequencyMapper.java 
    package com.orderinversion;
    
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class RelativeFrequencyMapper extends Mapper<LongWritable,Text,WordPair,IntWritable> {
        private WordPair wordPair = new WordPair();
        private IntWritable ONE = new IntWritable(1);
        private IntWritable totalCount = new IntWritable();
    
        @Override
        protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException {
            int neighbors = context.getConfiguration().getInt("neighbors",2);
            String[] tokens = value.toString().split(" ");
            if (tokens.length < 2) {
                return;
            }
    
            for (int i=0;i<tokens.length;i++) {
                wordPair.setWord(tokens[i]);
                int start = (i-neighbors<0)?0:i-neighbors;
                int end = (i+neighbors>=tokens.length)?tokens.length-1:i+neighbors;
    
                for (int j=start;j<=end;j++) {
                    if (i == j) {
                        continue;
                    }
                    wordPair.setNeighbor(tokens[j]);
                    context.write(wordPair,ONE);
                }
                
                wordPair.setNeighbor("*");
                totalCount.set(end - start);
                context.write(wordPair,totalCount);
            }
        }
    }
    
    
  • 4、 RelativeFrequencyReducer.java
    写一个reducer来实现倒序模式很简单。引入一个计数变量以及一个表示当前词的“current”变量。reducer会检查作为输入key的WordPair 右边是不是特殊字符“*”。假如左边的词不等于“current”表示的词就重置计数变量,并且计算current表示的词的总次数。然后处理下一个WordPair对象,在同一个current范围内,计数之和与各个不同右词的计数结合就可以得到相对频率。继续这个过程直到发现另一个词(左词)然后再重新开始。

    [hadoop@master OrderInversion]$ cat src/com/orderinversion/RelativeFrequencyReducer.java 
    package com.orderinversion;
    
    import java.io.IOException;
    
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.IntWritable;
    
    public class RelativeFrequencyReducer extends Reducer<WordPair,IntWritable,WordPair,DoubleWritable>  {
        private DoubleWritable totalCount = new DoubleWritable();
        private DoubleWritable relativeCount = new DoubleWritable();
        private String currentWord = "NOT SET";
        private String flag = "*";
    
        @Override 
        protected void reduce(WordPair key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException {
            if (key.getNeighbor().equals(flag)) {
                if (key.getWord().equals(currentWord)) {
                    totalCount.set(totalCount.get()+getTotalCount(values));
                }else {
                    currentWord = key.getWord();
                    totalCount.set(0);
                    totalCount.set(getTotalCount(values));
                }
            }else {
                    int count = getTotalCount(values);
                    relativeCount.set((double)count/totalCount.get());
                    context.write(key,relativeCount);
            }
        }
    
    
        private int getTotalCount(Iterable<IntWritable> values) {
            int count = 0;
            for (IntWritable v : values) {
                count += v.get();
            }
            return count;
        }
    }
    
    
  • 5、 RelativeFrequency.java

    [hadoop@master OrderInversion]$ cat src/com/orderinversion/RelativeFrequency.java 
    package com.orderinversion;
    
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.fs.FileSystem;
    
    public class RelativeFrequency {
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            job.setJarByClass(RelativeFrequency.class);
            job.setJobName("OrderInversion");
            
            FileInputFormat.setInputPaths(job,new Path(args[0]));
            Path outPath = new Path(args[1]);
            FileSystem fs = FileSystem.get(conf);
            if (fs.exists(outPath)) {
                fs.delete(outPath,true);
            }
            FileOutputFormat.setOutputPath(job, outPath); 
            
            //job.setNumReduceTasks(0);
            job.setMapOutputKeyClass(WordPair.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            job.setOutputKeyClass(WordPair.class);
            job.setOutputValueClass(DoubleWritable.class);
    
            job.setMapperClass(RelativeFrequencyMapper.class);
            job.setReducerClass(RelativeFrequencyReducer.class);
            job.setPartitionerClass(WordPairPartitoner.class);
    
            System.exit(job.waitForCompletion(true) ? 0 : 1);
    
        }
    }
    
    
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,236评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,867评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,715评论 0 340
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,899评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,895评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,733评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,085评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,722评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,025评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,696评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,816评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,447评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,057评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,009评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,254评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,204评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,561评论 2 343

推荐阅读更多精彩内容