一、前言
在这篇文章里我们要讨论的是排序反转模式。这种模式利用MapReduce的排序(sorting)阶段,让一部分数据提前发送到reducer端以利于后续计算,如果你对MapReduce了解不多,我劝你读下去,因为我将展示给你如何使用排序(sorting)和partitioner来实现我们的目的,这将会大有益处。
做完整个示例后,我觉得反转排序和二次排序是一样的道理。
- 如下所示文件:
[hadoop@master OrderInversion]$ cat orderinversion.txt java is a great language java is a programming language java is green fun language java is great programming with java is fun
- 要对其进行相对词频统计
[hadoop@master OrderInversion]$ hdfs dfs -cat test_output/part* (a,great) 0.125 (a,is) 0.25 (a,java) 0.25 (a,language) 0.25 (a,programming) 0.125 (fun,green) 0.2 (fun,is) 0.4 (fun,java) 0.2 (fun,language) 0.2 (great,a) 0.2 (great,is) 0.4 (great,java) 0.2 (great,language) 0.2 (green,fun) 0.25 (green,is) 0.25 (green,java) 0.25 (green,language) 0.25 (is,a) 0.14285714285714285 (is,fun) 0.14285714285714285 (is,great) 0.14285714285714285 (is,green) 0.07142857142857142 (is,java) 0.35714285714285715 (is,programming) 0.07142857142857142 (is,with) 0.07142857142857142 (java,a) 0.16666666666666666 (java,fun) 0.08333333333333333 (java,great) 0.08333333333333333 (java,green) 0.08333333333333333 (java,is) 0.4166666666666667 (java,programming) 0.08333333333333333 (java,with) 0.08333333333333333 (language,a) 0.3333333333333333 (language,fun) 0.16666666666666666 (language,great) 0.16666666666666666 (language,green) 0.16666666666666666 (language,programming) 0.16666666666666666 (programming,a) 0.2 (programming,is) 0.2 (programming,java) 0.2 (programming,language) 0.2 (programming,with) 0.2 (with,is) 0.3333333333333333 (with,java) 0.3333333333333333 (with,programming) 0.3333333333333333 [hadoop@master OrderInversion]$ cat orderinversion.txt java is a great language java is a programming language java is green fun language java is great programming with java is fun [hadoop@master OrderInversion]$
二、 实现示例
-
1、 WordPair.java
修改WordPair类的compareTo方法,让发现 “*” 为右词的对象排到前列。[hadoop@master OrderInversion]$ cat src/com/orderinversion/WordPair.java package com.orderinversion; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class WordPair implements WritableComparable<WordPair> { private String word; private String neighbor; public WordPair() { } public void setWord(String word) { this.word = word; } public void setNeighbor(String neighbor) { this.neighbor = neighbor; } public String getWord() { return word; } public String getNeighbor() { return neighbor; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(word); out.writeUTF(neighbor); } @Override public void readFields(DataInput in) throws IOException { word = in.readUTF(); neighbor = in.readUTF(); } @Override public int compareTo(WordPair other) { int ret = word.compareTo(other.getWord()); if (ret != 0) { return ret; } if (neighbor.equals("*")) { return -1; } if (other.getNeighbor().equals("*")) { return 1; } return neighbor.compareTo(other.getNeighbor()); } @Override public String toString(){ return "(" + word + "," + neighbor + ")"; } }
-
2、 WordPairPartitoner.java
用key的hashcode对reducer数取模,就把key分配到了不同的reducer,这就是shuffle过程。但我们的WordPair 对象包含2个词,计算整个对象的hashcode是行不通的。我们需要写一个自己的Partitioner, 它在选择将输出发送到哪个reducer的时候只考虑左边的词。[hadoop@master OrderInversion]$ cat src/com/orderinversion/WordPairPartitoner.java package com.orderinversion; import org.apache.hadoop.mapreduce.Partitioner; public class WordPairPartitoner extends Partitioner<WordPair,Integer> { @Override public int getPartition(WordPair wordPair,Integer integer,int numPartitions) { return wordPair.getWord().hashCode() % numPartitions; } }
-
3、 RelativeFrequencyMapper.java
首先我们要对mapper做一些有别于配对方法的修改。在每次循环的最后,输出了某个词的所有的词对之后,输出一个特殊的词对(“word”,”*”), 计数就是这个词作为左边词的词对出现的次数。
现在我们找到了统计特定词出现次数的办法,我们还需要想办法让这个特定的词对称为reduce处理的第一条记录以便计算相对频度。我们可以通过修改WordPair对象的compareTo方法在MapReduce 的sorting阶段来实现这个目的。[hadoop@master OrderInversion]$ cat src/com/orderinversion/RelativeFrequencyMapper.java package com.orderinversion; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Mapper; public class RelativeFrequencyMapper extends Mapper<LongWritable,Text,WordPair,IntWritable> { private WordPair wordPair = new WordPair(); private IntWritable ONE = new IntWritable(1); private IntWritable totalCount = new IntWritable(); @Override protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException { int neighbors = context.getConfiguration().getInt("neighbors",2); String[] tokens = value.toString().split(" "); if (tokens.length < 2) { return; } for (int i=0;i<tokens.length;i++) { wordPair.setWord(tokens[i]); int start = (i-neighbors<0)?0:i-neighbors; int end = (i+neighbors>=tokens.length)?tokens.length-1:i+neighbors; for (int j=start;j<=end;j++) { if (i == j) { continue; } wordPair.setNeighbor(tokens[j]); context.write(wordPair,ONE); } wordPair.setNeighbor("*"); totalCount.set(end - start); context.write(wordPair,totalCount); } } }
-
4、 RelativeFrequencyReducer.java
写一个reducer来实现倒序模式很简单。引入一个计数变量以及一个表示当前词的“current”变量。reducer会检查作为输入key的WordPair 右边是不是特殊字符“*”。假如左边的词不等于“current”表示的词就重置计数变量,并且计算current表示的词的总次数。然后处理下一个WordPair对象,在同一个current范围内,计数之和与各个不同右词的计数结合就可以得到相对频率。继续这个过程直到发现另一个词(左词)然后再重新开始。[hadoop@master OrderInversion]$ cat src/com/orderinversion/RelativeFrequencyReducer.java package com.orderinversion; import java.io.IOException; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.IntWritable; public class RelativeFrequencyReducer extends Reducer<WordPair,IntWritable,WordPair,DoubleWritable> { private DoubleWritable totalCount = new DoubleWritable(); private DoubleWritable relativeCount = new DoubleWritable(); private String currentWord = "NOT SET"; private String flag = "*"; @Override protected void reduce(WordPair key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException { if (key.getNeighbor().equals(flag)) { if (key.getWord().equals(currentWord)) { totalCount.set(totalCount.get()+getTotalCount(values)); }else { currentWord = key.getWord(); totalCount.set(0); totalCount.set(getTotalCount(values)); } }else { int count = getTotalCount(values); relativeCount.set((double)count/totalCount.get()); context.write(key,relativeCount); } } private int getTotalCount(Iterable<IntWritable> values) { int count = 0; for (IntWritable v : values) { count += v.get(); } return count; } }
-
5、 RelativeFrequency.java
[hadoop@master OrderInversion]$ cat src/com/orderinversion/RelativeFrequency.java package com.orderinversion; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileSystem; public class RelativeFrequency { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(RelativeFrequency.class); job.setJobName("OrderInversion"); FileInputFormat.setInputPaths(job,new Path(args[0])); Path outPath = new Path(args[1]); FileSystem fs = FileSystem.get(conf); if (fs.exists(outPath)) { fs.delete(outPath,true); } FileOutputFormat.setOutputPath(job, outPath); //job.setNumReduceTasks(0); job.setMapOutputKeyClass(WordPair.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(WordPair.class); job.setOutputValueClass(DoubleWritable.class); job.setMapperClass(RelativeFrequencyMapper.class); job.setReducerClass(RelativeFrequencyReducer.class); job.setPartitionerClass(WordPairPartitoner.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } }