1.分布式计算思想:
1.1基本思想:mapreduce是两个操作步骤,即映射和规约也是这个分布式计算的思想。即实现一个指定的Map映射函数,用来把一组键值对映射成新的键值对,再把新的键值对发送个Reduce规约函数,用来保证所有映射的键值对中的每一个共享相同的键组。
1.2执行流程:HDFS上的数据与map任务沟通时会被切分split一个split对应一个map ,块和split数目不一定相同,每一个reduce任务对应一个文件。结果存放在目录中
map任务运行在节点上(一个节点可以运行多个map任务,但是一个map任务不能跨多个节点上运行)reduce任务与map同理进入map键值对k1
v1原始数据
1.3处理计算步骤:
1.3.1. map任务处理
1.1读取输入文件内容,解析成key、value对。对输入文件的每一行,解析成key、value对。每一个键值对调用一次map函数。
1.2.写自己的逻辑,对输入的key、value处理,转换成新的key、value输出。
1.3对输出的key、value进行分区。
1.4.对不同分区的数据,按照key进行排序、分组。相同key的value放到一个集合中。
1.5分组后的数据进行归约。
1.3.2.reduce任务处理
2.1.对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。
2.2.对多个map任务的输出进行合并、排序。写reduce函数自己的逻辑,对输入的key、values处理,转换成新的key、value输出。
2.3把reduce的输出保存到HDFS文件中,k3
v3是结果数据,k2 v2是中间数据,k1 v1是初始数据
某些类的数据,送到某些reduce中,这个过程被称为shuffle——数据分配过程map和reducer在伪分布式下是在第一个节点的map任务数量一般一个节点会有2个任务,输出流hdfs完成
2.业务逻辑:
移动公司日志,一般记录以下几个数据,手机号码,上行流量,下行流量。目的统计一个手机号码某一个时间段的产生的总流量。并按照一定的规则排序。
首先把这个庞大的数据源从本地上传到Hdfs上,被逻辑切分多个块,然后进行分布式并行计算统计总流量。
3.具体步骤:
3.1.继承Mapper类重写map方法
这个步骤就是把大文件分成若干个split,每个split对应一个map任务,比如说计算一个136字段的手机号的总流量。
3.2.继承Reducer类重写reduce方法
这个步骤是对map处理的数据进行汇总,比如分区,排序。
3.3.新建驱动Job加载reduce和map方法
Mapreduce代码:
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
importorg.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class DataCount {
publicstatic class DCMapper extends Mapper{
protectedvoid map(LongWritable key, Text value, Context context)
throwsIOException, InterruptedException {
//把文本数据转化String类型便于后续分割
Stringline = value.toString();
//分割数据
String[]fields = line.split("\t");
Stringtel = fields[1];
longup = Long.parseLong(fields[8]);
longdown = Long.parseLong(fields[9]);
DataBeanbean = new DataBean(tel, up, down); //封装的对象
//send
context.write(newText(tel), bean);//提交给reduce
}
}
publicstatic class DCReducer extends Reducer{
@Override
protectedvoid reduce(Text key, Iterable values, Context context)
throwsIOException, InterruptedException {
longup_sum = 0;
longdown_sum = 0;
for(DataBeanbean : values){
up_sum+= bean.getUpPayLoad();
down_sum+= bean.getDownPayLoad();
}
DataBeanbean = new DataBean("", up_sum, down_sum);
context.write(key,bean);
}
}
publicstaticclass Partitioner1 extendsPartitioner{
publicint getPartition(Text key, DataBean value, int numPartitions)
{
Stringaccount=key.toString();
Stringsub_acc=account.substring(0,3);
return0;
}
}
publicstatic void main(String[] args) throws Exception
{
Configurationconf = new Configuration();
Jobjob = Job.getInstance(conf);
job.setJarByClass(DataCount.class);
job.setMapperClass(DCMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DataBean.class);
FileInputFormat.setInputPaths(job,new Path(args[0]));
job.setReducerClass(DCReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DataBean.class);
FileOutputFormat.setOutputPath(job,new Path(args[1]));
job.waitForCompletion(true);
}
}
4.对于对象的封装
封装代码:
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class DataBean implements Writable{
privateString tel;
privatelong upPayLoad;
privatelong downPayLoad;
privatelong totalPayLoad;
publicDataBean(){}
publicDataBean(String tel, long upPayLoad, long downPayLoad) {
super();
this.tel= tel;
this.upPayLoad= upPayLoad;
this.downPayLoad= downPayLoad;
this.totalPayLoad= upPayLoad + downPayLoad;
}
publicString toString() {
returnthis.upPayLoad + "\t" + this.downPayLoad + "\t" +this.totalPayLoad;
}
publicvoid write(DataOutput out) throws IOException {
out.writeUTF(tel);
out.writeLong(upPayLoad);
out.writeLong(downPayLoad);
out.writeLong(totalPayLoad);
}
publicvoid readFields(DataInput in) throws IOException {
this.tel= in.readUTF();
this.upPayLoad= in.readLong();
this.downPayLoad= in.readLong();
this.totalPayLoad= in.readLong();
}
publicString getTel() {
returntel;
}
publicvoid setTel(String tel) {
this.tel= tel;
}
publiclong getUpPayLoad() {
returnupPayLoad;
}
publicvoid setUpPayLoad(long upPayLoad) {
this.upPayLoad= upPayLoad;
}
publiclong getDownPayLoad() {
returndownPayLoad;
}
publicvoid setDownPayLoad(long downPayLoad) {
this.downPayLoad= downPayLoad;
}
publiclong getTotalPayLoad() {
returntotalPayLoad;
}
publicvoid setTotalPayLoad(long totalPayLoad) {
this.totalPayLoad= totalPayLoad;
}
}
5.业务处理排序:
排序代码:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class paixu extendsMapper
{
publicstatic void main(String[] args) throws IOException {
Configurationconf=new Configuration();
Jobjob=Job.getInstance(conf);
job.setJarByClass(paixu.class);
//job.setMapperClass();
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(bean.class);
FileInputFormat.setInputPaths(job,new Path(args[0]));
job.setReducerClass(r.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(bean.class);
FileOutputFormat.setOutputPath(job,new Path(args[1]));
}
privateText k=new Text();
privatebean v=new bean();
protectedvoid map(LongWritable key, Text value,Context context)
throwsIOException, InterruptedException {
Stringline=value.toString();
String[]fields=line.split("\t");
Stringaccount=fields[0];
doublein=Double.parseDouble(fields[1]);
doubleout=Double.parseDouble(fields[2]);
k.set(account);
v.set(account,in, out);
context.write(k,v);
}
publicstatic class r extends Reducer{
privatebean v =new bean();
protectedvoid reduce(Text key, Iterable value,Context context)
throwsIOException, InterruptedException {
doubleinsum=0;
doubleoutsum=0;
for(beano:value)
{
insum+=o.getIncome();
outsum+=o.getExpenses();
}
v.set("",insum, outsum);
context.write(key,v);
}
}
}
6.运行结果:
总结:在写Mapreduce中体会到只要做好自己三个步骤,剩下的框架都会自动做好,不需要考虑,但是在自己做的三个步骤中最难的就是业务逻辑的处理。所以在Mapreduce分布式并行计算中业务逻辑处理是重中之重。
�