hadoop-mapreduce案例集

【1:】数据去重
"数据去重"主要是为了掌握和利用并行化思想来对数据进行有意义筛选统计大数据集上的数据种类个数从网站日志中计算访问地等这些看似庞杂的任务都会涉及数据去重。下面就进入这个实例的MapReduce程序设计。

注:戴#号的是重复数据
file1数据:

2012-3-1 a
2012-3-2 b
2012-3-3 c #
2012-3-4 d #
2012-3-5 a #
2012-3-6 b
2012-3-7 c
2012-3-3 c #

file2数据:

2012-3-1 b
2012-3-2 a
2012-3-3 b
2012-3-4 d #
2012-3-5 a #
2012-3-6 c
2012-3-7 d
2012-3-3 c #

输出结果:

2012-3-1 a
2012-3-1 b
2012-3-2 a
2012-3-2 b
2012-3-3 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-6 c
2012-3-7 c
2012-3-7 d

设计思路:
数据去重最终目标是让原始数据出现次数超过一次数据输出文件只出现一次。我们自然而然会想到将同一个数据的所有记录都交给一台reduce机器,无论这个数据出现多少次,只要在最终结果中输出一次就可以了。具体就是reduce的输入应该以数据作为key,而对value-list则没有要求。当reduce接收到一个<key,value-list>时就直接将key复制到输出的key中,并将value设置成空值
在MapReduce流程中,map的输出<key,value>经过shuffle过程聚集成<key,value-list>后会交给reduce。所以从设计好的reduce输入可以反推出map的输出key应为数据,value任意。继续反推,map输出数据的key为数据,而在这个实例中每个数据代表输入文件中的一行内容,所以map阶段要完成的任务就是在采用Hadoop默认的作业输入方式之后,将value设置为key,并直接输出(输出中的value任意)。map中的结果经过shuffle过程之后交给reduce。reduce阶段不会管每个key有多少个value,它直接将输入的key复制为输出的key,并输出就可以了(输出中的value被设置成空了)。

代码:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class DataNoHeavy {

    public static class DataMap extends Mapper<LongWritable, Text, Text, Text>{
        
        protected void map(LongWritable key, Text value, Context context) 
                throws java.io.IOException ,InterruptedException {
            Text line = value;
            context.write(line, new Text(""));
        };
    }
    
    public static class DataReduce extends Reducer<Text, Text, Text, Text>{
    
        protected void reduce(Text key, Iterable<Text> value, Context context) 
                throws java.io.IOException ,InterruptedException {
             context.write(key, new Text(""));
        };
    }
    
    public static void main(String[] args) {
        
            Configuration conf = new Configuration();
            //设置mapper的配置,既就是hadoop/conf/mapred-site.xml的配置信息
            conf.set("mapred.job.tracker", "hadoop01:9001");        
            try {
                //新建一个Job工作
                Job job = new Job(conf);
                //设置运行类
                job.setJarByClass(JobRun.class);
                //设置要执行的mapper类(自己书写的)
                job.setMapperClass(DataMap.class);
                //设置要执行的reduce类(自己书写的)
                job.setReducerClass(DataReduce.class);
                //设置输出key的类型
                job.setMapOutputKeyClass(Text.class);
                //设置输出value的类型
                job.setMapOutputValueClass(Text.class);
                
                //设置ruduce任务的个数,默认个数为一个(一般reduce的个数越多效率越高)
                job.setNumReduceTasks(1);
                
                //mapreduce 输入数据的文件/目录
                FileInputFormat.addInputPath(job, new Path("/usr/input/wc/Demo/dataNoHeavy"));
                //mapreduce 执行后输出的数据目录
                FileOutputFormat.setOutputPath(job, new Path("/usr/output/Demo/dataNoHeavy"));
                //执行完毕退出
                System.exit(job.waitForCompletion(true) ? 0 : 1);
                
            } catch (Exception e) {
                e.printStackTrace();
            }
    }
}

图例:

数据去重.png

【2】数据排序
"数据排序"是许多实际任务执行时要完成的第一项工作,比如学生成绩评比数据建立索引等。这个实例和数据去重类似,都是原始数据进行初步处理,为进一步的数据操作打好基础。下面进入这个示例。

数据文件:

file1:

2
32
654
32
15
756
65223

file2:

5956
22
650
92

file3:

26
54
6

输出结果:

1    2
2    6
3    15
4    22
5    26
6    32
7    32
8    54
9    92
10    650
11    654
12    756
13    5956
14    65223

设计思路:
这个实例仅仅要求对输入数据进行排序,熟悉MapReduce过程的读者会很快想到在MapReduce过程中就有排序,是否可以利用这个默认的排序,而不需要自己再实现具体的排序呢?答案是肯定的。
  但是在使用之前首先需要了解它的默认排序规则。它是按照key值进行排序的,如果key为封装int的IntWritable类型,那么MapReduce按照数字大小对key排序,如果key为封装为String的Text类型,那么MapReduce按照字典顺序对字符串排序。

了解了这个细节,我们就知道应该使用封装int的IntWritable型数据结构了。也就是在map中将读入的数据转化成IntWritable型,然后作为key值输出(value任意)。reduce拿到<key,value-list>之后,将输入的key作为value输出,并根据value-list元素个数决定输出的次数。输出的key(即代码中的linenum)是一个全局变量,它统计当前key的位次。需要注意的是这个程序中没有配置Combiner,也就是在MapReduce过程中不使用Combiner。这主要是因为使用map和reduce就已经能够完成任务了。

代码:


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class DataSort {
    
    public static class DataSortMap extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
        protected void map(LongWritable key, Text value, Context context) 
                throws java.io.IOException ,InterruptedException {
            String line = value.toString();
            context.write(new IntWritable(Integer.parseInt(line)), new IntWritable(1));
        };
    }
    
    public static class DataSortReduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
        private static IntWritable linenum = new IntWritable(1);
        protected void reduce(IntWritable key, Iterable<IntWritable> value, Context context) 
                throws java.io.IOException ,InterruptedException {
            for(IntWritable val:value){
                context.write(linenum, key);
                linenum = new IntWritable(linenum.get()+1);
            }
        };
    }

    public static void main(String[] args) {
        Configuration conf = new Configuration();
        //设置mapper的配置,既就是hadoop/conf/mapred-site.xml的配置信息
        conf.set("mapred.job.tracker", "hadoop01:9001");        
        try {
            //新建一个Job工作
            Job job = new Job(conf);
            //设置运行类
            job.setJarByClass(JobRun.class);
            //设置要执行的mapper类(自己书写的)
            job.setMapperClass(DataSortMap.class);
            //设置要执行的reduce类(自己书写的)
            job.setReducerClass(DataSortReduce.class);
            //设置输出key的类型
            job.setOutputKeyClass(IntWritable.class);
            //设置输出value的类型
            job.setOutputValueClass(IntWritable.class);
            
            //设置ruduce任务的个数,默认个数为一个(一般reduce的个数越多效率越高)
            //job.setNumReduceTasks(1);
            
            //mapreduce 输入数据的文件/目录
            FileInputFormat.addInputPath(job, new Path("/usr/input/wc/Demo/dataSort"));
            //mapreduce 执行后输出的数据目录
            FileOutputFormat.setOutputPath(job, new Path("/usr/output/Demo/dataSort"));
            //执行完毕退出
            System.exit(job.waitForCompletion(true) ? 0 : 1 );          
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

数据排序.png

【3】平均值
"平均成绩"主要目的还是在重温经典"WordCount"例子,可以说是在基础上的微变化版,该实例主要就是实现一个计算学生平均成绩的例子。
 对输入文件中数据进行就算学生平均成绩。输入文件中的每行内容均为一个学生姓名和他相应的成绩,如果有多门学科,则每门学科为一个文件。要求在输出中每行有两个间隔的数据,其中,第一个代表学生的姓名第二个代表其平均成绩

数据:

math:

张三    88
李四    99
王五    66
赵六    77

china:

张三    78
李四    89
王五    96
赵六    67

english:

张三    80
李四    82
王五    84
赵六    86

输出结果:

张三    82
李四    90
王五    82
赵六    76

设计思路:
计算学生平均成绩是一个仿"WordCount"例子,用来重温一下开发MapReduce程序的流程。程序包括两部分的内容:Map部分和Reduce部分,分别实现了map和reduce的功能。
Map处理的是一个纯文本文件,文件中存放的数据时每一行表示一个学生的姓名和他相应一科成绩。Mapper处理的数据是由InputFormat分解过的数据集,其中InputFormat的作用是将数据集切割成小数据集InputSplit,每一个InputSlit将由一个Mapper负责处理。此外,InputFormat中还提供了一个RecordReader的实现,并将一个InputSplit解析成<key,value>对提供给了map函数。InputFormat的默认值是TextInputFormat,它针对文本文件,按行将文本切割成InputSlit,并用LineRecordReader将InputSplit解析成<key,value>对,key是行在文本中的位置,value是文件中的一行。
Map的结果会通过partion分发到Reducer,Reducer做完Reduce操作后,将通过以格式OutputFormat输出。
Mapper最终处理的结果对<key,value>,会送到Reducer中进行合并,合并的时候,有相同key的键/值对则送到同一个Reducer上。Reducer是所有用户定制Reducer类地基础,它的输入是key和这个key对应的所有value的一个迭代器,同时还有Reducer的上下文。Reduce的结果由Reducer.Context的write方法输出到文件中。

代码:


import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class AvgSource {

    public static class AvgSourceMap extends Mapper<LongWritable, Text, Text, IntWritable> {
        
        protected void map(LongWritable key, Text value, Context context) 
                throws java.io.IOException ,InterruptedException {
            String line = value.toString();
            StringTokenizer tok = new StringTokenizer(line, "\n");
            while(tok.hasMoreElements()){
                String name = tok.nextToken();
                String source = tok.nextToken();
                context.write(new Text(name), new IntWritable(Integer.parseInt(source)));
            }
        };
    }
    
    public static class AvgSourceReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
        
        protected void reduce(Text key, Iterable<IntWritable> value, Context context) 
                throws java.io.IOException ,InterruptedException {
            int sum = 0 ;
            int count = 0;
            Iterator<IntWritable> iterator = value.iterator();
            while(iterator.hasNext()){
                sum+= iterator.next().get();
                count++;
            }
            
            int avg = (int)sum/count;
            context.write(key, new IntWritable(avg));
            
        };
    }
    
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        //设置mapper的配置,既就是hadoop/conf/mapred-site.xml的配置信息
        conf.set("mapred.job.tracker", "hadoop01:9001");
        
        try {
            //新建一个Job工作
            Job job = new Job(conf);
            //设置运行类
            job.setJarByClass(JobRun.class);
            //设置要执行的mapper类(自己书写的)
            job.setMapperClass(AvgSourceMapper.class);
            //设置要执行的reduce类(自己书写的)
            job.setReducerClass(AvgSourceReduce.class);
            //设置输出key的类型
            job.setMapOutputKeyClass(Text.class);
            //设置输出value的类型
            job.setMapOutputValueClass(IntWritable.class);
            
            //设置ruduce任务的个数,默认个数为一个(一般reduce的个数越多效率越高)
            job.setNumReduceTasks(1);
            
            //mapreduce 输入数据的文件/目录
            FileInputFormat.addInputPath(job, new Path("/usr/input/wc/Demo/avgSource"));
            //mapreduce 执行后输出的数据目录
            FileOutputFormat.setOutputPath(job, new Path("/usr/output/Demo/avgSource"));
            //执行完毕退出
            System.exit(job.waitForCompletion(true) ? 0 : 1);
                
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

平均数.png

【4】单表链接
前面的实例都是在数据上进行一些简单的处理,为进一步的操作打基础。"单表关联"这个实例要求给出的数据寻找关心的数据,它是对原始数据所包含信息的挖掘。下面进入这个实例。

实例中给出child-parent(孩子——父母)表,要求输出grandchild-grandparent(孙子——爷奶)表。

数据:
file

child        parent
Tom        Lucy
Tom        Jack
Jone        Lucy
Jone        Jack
Lucy        Mary
Lucy        Ben
Jack        Alice
Jack        Jesse
Terry        Alice
Terry        Jesse
Philip        Terry
Philip        Alma
Mark        Terry
Mark        Alma

家谱关系:

家谱关系.png

输出结果:

grandchild        grandparent
Tom              Alice
Tom              Jesse
Jone              Alice
Jone              Jesse
Tom              Mary
Tom              Ben
Jone              Mary
Jone              Ben
Philip              Alice
Philip              Jesse
Mark              Alice
Mark              Jesse

代码:


import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SingleRelation {
    
    public static int time = 0;
    /*
     * map将输出分割child和parent,然后正序输出一次作为右表,
     * 反序输出一次作为左表,需要注意的是在输出的value中必须
     * 加上左右表的区别标识。
     */
    public static class SingleRelationMap extends Mapper<LongWritable, Text, Text, Text> {
        protected void map(LongWritable key, Text value, Context context) 
                throws IOException ,InterruptedException {
            String child = new String();//孩子名字
            String parent = new String();//父母名字
            String relation = new String();//左右表标示
            
            // 输入的一行预处理文本
            StringTokenizer st = new StringTokenizer(value.toString());
            String[] values = new String[2];
            int i = 0;
            while(st.hasMoreTokens()){
                values[i] = st.nextToken();
                i++;
            }
            
            if(values[0].compareTo("child") != 0){
                child = values[0];
                parent = values[1];
                
                //输出左表
                relation = "1";
                context.write(new Text(parent), new Text(relation+"+"+child+"+"+parent));
                //输出右表
                relation = "2";
                context.write(new Text(child), new Text(relation+"+"+child+"+"+parent));
            }
        };
    }
    
    public static class SingleRelationReduce extends Reducer<Text, Text, Text, Text> {
        protected void reduce(Text key, Iterable<Text> value, Context context) 
                throws IOException ,InterruptedException {
            //输出表头
            if(0 == time){
                 context.write(new Text("grandchild"), new Text("grandparent"));
                 time++;
            }
            
            int grandchildnum = 0 ;
            String[] grandchild = new String[10];
            int grandparentnum = 0;
            String[] grandparent = new String[10];
            
            Iterator ite = value.iterator();
            while (ite.hasNext()) {
                String record = ite.next().toString();
                int len = record.length();
                int i = 2;
                if(0 == len){
                    continue;
                }
                
                // 取得左右表标识
                char relation = record.charAt(0);
                // 定义孩子和父母变量
                String child = new String();
                String parent = new String();
                // 获取value-list中value的child
                while(record.charAt(i) != '+'){
                    child += record.charAt(i);
                    i++;
                }
                
                i = i + 1;
                
                while(i < len){
                    parent += record.charAt(i);
                    i++;
                }
                
                // 左表,取出child放入grandchildren
                 if ('1' == relation) {
                     grandchild[grandchildnum] = child;
                     grandchildnum++;
                    }
                // 右表,取出parent放入grandparent
                 if ('2' == relation) {
                     grandparent[grandparentnum] = parent;
                     grandparentnum++;
                 }
                // grandchild和grandparent数组求笛卡尔儿积
            }
            
            if (0 != grandchildnum && 0 != grandparentnum) {
                for (int m = 0; m < grandchildnum; m++) {
                    for (int n = 0; n < grandparentnum; n++) {
                        // 输出结果
                        context.write(new Text(grandchild[m]), new Text(grandparent[n]));
                    }
                }
            }
            
        };
    }

    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.set("mapred.job.tracker", "hadoop01:9001");
        
        try {
            Job job = new Job(conf);
            job.setJarByClass(JobRun.class);
            // 设置Map和Reduce处理类
            job.setMapperClass(SingleRelationMap.class);
            job.setReducerClass(SingleRelationReduce.class);
            
            // 设置输出类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            
         // 设置输入和输出目录

            FileInputFormat.addInputPath(job, new Path("/usr/input/wc/Demo/single"));

            FileOutputFormat.setOutputPath(job, new Path("/usr/output/Demo/single"));

            System.exit(job.waitForCompletion(true) ? 0 : 1);
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行过程解析

map处理

child        parent                àà                    忽略此行
Tom        Lucy                   àà                <Lucy,1+Tom+Lucy>
                                                    <Tom,2+Tom+Lucy >
Tom        Jack                    àà                <Jack,1+Tom+Jack>
                                                    <Tom,2+Tom+Jack>
Jone        Lucy                 àà                <Lucy,1+Jone+Lucy>
                                                    <Jone,2+Jone+Lucy>
Jone        Jack                    àà                <Jack,1+Jone+Jack>
                                                    <Jone,2+Jone+Jack>
Lucy        Mary                   àà                <Mary,1+Lucy+Mary>
                                                    <Lucy,2+Lucy+Mary>
Lucy        Ben                    àà                <Ben,1+Lucy+Ben>
                                                     <Lucy,2+Lucy+Ben>
Jack        Alice                    àà                <Alice,1+Jack+Alice>
                                                      <Jack,2+Jack+Alice>
Jack        Jesse                   àà                <Jesse,1+Jack+Jesse>
                                                      <Jack,2+Jack+Jesse>
Terry        Alice                   àà                <Alice,1+Terry+Alice>
                                                      <Terry,2+Terry+Alice>
Terry        Jesse                  àà                <Jesse,1+Terry+Jesse>
                                                      <Terry,2+Terry+Jesse>
Philip        Terry                  àà                <Terry,1+Philip+Terry>
                                                      <Philip,2+Philip+Terry>
Philip        Alma                   àà                <Alma,1+Philip+Alma>
                                                      <Philip,2+Philip+Alma>
Mark        Terry                   àà                <Terry,1+Mark+Terry>
                                                      <Mark,2+Mark+Terry>
Mark        Alma                 àà                <Alma,1+Mark+Alma>
                                                      <Mark,2+Mark+Alma>

Shuffle处理

shuffle.png

reduce处理
首先由语句"
0 != grandchildnum && 0 != grandparentnum"得知,只要在"value-list"中没有左表或者右表,则不会做处理,可以根据这条规则去除无效shuffle连接*。

2016-11-20_213049.png
然后根据下面语句进一步对有效的shuffle连接做处理。
// 左表,取出child放入grandchildren
if ('1' == relationtype) {
    grandchild[grandchildnum] = childname;
    grandchildnum++;
}

// 右表,取出parent放入grandparent
if ('2' == relationtype) {
    grandparent[grandparentnum] = parentname;
    grandparentnum++;
}
针对一条数据进行分析:
<Jack,1+Tom+Jack,
        1+Jone+Jack,
        2+Jack+Alice,
        2+Jack+Jesse >

分析结果左表用"字符1"表示,右表用"字符2"表示,上面的<key,value-list>中的"key"表示左表与右表连接键。而"value-list"表示以"key"连接左表与右表相关数据
根据上面针对左表与右表不同的处理规则,取得两个数组的数据如下所示:

2016-11-20_213624.png

然后根据下面语句进行处理。

for (int m = 0; m < grandchildnum; m++) {
    for (int n = 0; n < grandparentnum; n++) {
        context.write(new Text(grandchild[m]), new Text(grandparent[n]));
    }
}
2016-11-20_214216.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,902评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,037评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,978评论 0 332
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,867评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,763评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,104评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,565评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,236评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,379评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,313评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,363评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,034评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,637评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,719评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,952评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,371评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,948评论 2 341

推荐阅读更多精彩内容