Pipeline详解及Spark MLlib使用示例(Scala/Java/Python)【转】

本文中,我们介绍机器学习管道的概念。机器学习管道提供一系列基于数据框的高级的接口来帮助用户建立和调试实际的机器学习管道。

管道里的主要概念
MLlib提供标准的接口来使联合多个算法到单个的管道或者工作流,管道的概念源于scikit-learn项目。

1.数据框:机器学习接口使用来自Spark SQL的数据框形式数据作为数据集,它可以处理多种数据类型。比如,一个数据框可以有不同的列存储文本、特征向量、标签值和预测值。
2.转换器:转换器是将一个数据框变为另一个数据框的算法。比如,一个机器学习模型就是一个转换器,它将带有特征数据框转为预测值数据框。
3.估计器:估计器是拟合一个数据框来产生转换器的算法。比如,一个机器学习算法就是一个估计器,它训练一个数据框产生一个模型。
4.管道:一个管道串起多个转换器和估计器,明确一个机器学习工作流。
5.参数:管道中的所有转换器和估计器使用共同的接口来指定参数。

数据框
机器学习算法可以应用于多种类型的数据,如向量、文本、图像和结构化数据。管道接口中采用来自Spark SQL的数据框来支持多种类型的数据。可以查看Spark SQLdatatype reference来了解数据框支持的基础和结构化数据类型。除了Spark SQL指南中提到的数据类型外,数据框还可以使用机器学习向量类型。可以显式地建立数据框或者隐式地从规则的RDD建立数据框,下面的代码将会给出示例。数据框中的列需要命名。代码中的示例使用如“text”,“features“和”label“的名字。

管道组件

转换器
转换器包含特征变化和学习模型。技术上来说,转化器通过方法transform(),在原始数据上增加一列或者多列来将一个数据框转为另一个数据框。如:
1.一个特征转换器输入一个数据框,读取一个文本列,将其映射为新的特征向量列。输出一个新的带有特征向量列的数据框。
2.一个学习模型转换器输入一个数据框,读取包括特征向量的列,预测每一个特征向量的标签。输出一个新的带有预测标签列的数据框。

估计器
估计器指用来拟合或者训练数据的学习算法或者任何算法。技术上说,估计器通过fit()方法,接受一个数据框产生一个模型。比如,逻辑回归就是一个估计器,通过fit()来产生一个逻辑回归模型。

管道组件的特性
转换器的transform()方法和估计器的fit()方法都是无状态性的。将来,有状态性的算法可能通过其他概念得到支持。
每个转换器或估计器实例有唯一的编号,这个特征在制定参数的时候非常有用。

管道
在机器学习中,运行一系列算法来处理和学习数据是非常常见的。如一个文档数据的处理工作流可能包括下列步骤:
1.将文档氛围单个词语。
2.将每个文档中的词语转为数字化的特征向量。
3.使用特征向量和标签学习一个预测模型。
MLlib将上述的工作流描述为管道,它包含一系列需要被执行的有顺序的管道阶段(转换器和估计器)。本节中我们将会使用上述文档处理工作流作为例子。

工作原理
管道由一系列有顺序的阶段指定,每个状态时转换器或估计器。每个状态的运行是有顺序的,输入的数据框通过每个阶段进行改变。在转换器阶段,transform()方法被调用于数据框上。对于估计器阶段,fit()方法被调用来产生一个转换器,然后该转换器的transform()方法被调用在数据框上。
下面的图说明简单的文档处理工作流的运行。

pipeline_1.png

上面的图示中,第一行代表管道处理的三个阶段。第一二个蓝色的阶段是转换器,第三个红色框中的逻辑回归是估计器。底下一行代表管道中的数据流,圆筒指数据框。管道的fit()方法被调用于原始的数据框中,里面包含原始的文档和标签。分词器的transform()方法将原始文档分为词语,添加新的词语列到数据框中。哈希处理的transform()方法将词语列转换为特征向量,添加新的向量列到数据框中。然后,因为逻辑回归是估计器,管道先调用逻辑回归的fit()方法来产生逻辑回归模型。如果管道还有其它更多阶段,在将数据框传入下一个阶段之前,管道会先调用逻辑回归模型的transform()方法。

整个管道是一个估计器。所以当管道的fit()方法运行后,会产生一个管道模型,管道模型是转换器。管道模型会在测试时被调用,下面的图示说明用法。

pipeline_2.png

上面的图示中,管道模型和原始管道有同样数目的阶段,然而原始管道中的估计器此时变为了转换器。当管道模型的transform()方法被调用于测试数据集时,数据依次经过管道的各个阶段。每个阶段的transform()方法更新数据集,并将之传到下个阶段。

管道和管道模型有助于确认训练数据和测试数据经过同样的特征处理流程。

详细信息

DAG管道:管道的状态是有序的队列。这儿给的例子都是线性的管道,也就是说管道的每个阶段使用上一个阶段产生的数据。我们也可以产生非线性的管道,数据流向为无向非环图(DAG)。这种图通常需要明确地指定每个阶段的输入和输出列名(通常以指定参数的形式)。如果管道是DAG形式,则每个阶段必须以拓扑序的形式指定。

运行时间检查:因为管道可以运行在多种数据类型上,所以不能使用编译时间检查。管道和管道模型在实际运行管道之前就会进行运行时间检查。这种检查通过数据框摘要,它描述了数据框中各列的类型。

管道的唯一阶段:管道的的每个阶段需要是唯一的实体。如同样的实体“哈希变换”不可以进入管道两次,因为管道的每个阶段必须有唯一的ID。当然“哈希变换1”和“哈希变换2”(都是哈希变换类型)可以进入同个管道两次,因为他们有不同的ID。

参数

MLlib估计器和转换器使用统一的接口来指定参数。Param是有完备文档的已命名参数。ParamMap是一些列“参数-值”对。

有两种主要的方法来向算法传递参数:

1.给实体设置参数。比如,lr是一个逻辑回归实体,通过lr.setMaxIter(10)来使得lr在拟合的时候最多迭代10次。这个接口与spark.mllib包相似。
2.传递ParamMap到fit()或者transform()。所有在ParamMap里的参数都将通过设置被重写。

参数属于指定估计器和转换器实体过程。因此,如果我们有两个逻辑回归实体lr1和lr2,我们可以建立一个ParamMap来指定两个实体的最大迭代次数参数:ParamMap(lr1.maxIter -> 10, lr2.maxIter -> 20)。这在一个管道里有两个算法都有最大迭代次数参数时非常有用。

存储和读取管道

我们经常需要将管道存储到磁盘以供下次使用。在Spark1.6中,模型导入导出功能新添了管道接口,支持大多数转换器。请到算法接口文档查看是否支持存储和读入。

代码示例
下面给出上述讨论功能的代码示例:
估计器、转换器和Param示例:
Scala:

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.Row
 
// Prepare training data from a list of (label, features) tuples.
val training = spark.createDataFrame(Seq(
  (1.0, Vectors.dense(0.0, 1.1, 0.1)),
  (0.0, Vectors.dense(2.0, 1.0, -1.0)),
  (0.0, Vectors.dense(2.0, 1.3, 1.0)),
  (1.0, Vectors.dense(0.0, 1.2, -0.5))
)).toDF("label", "features")
 
// Create a LogisticRegression instance. This instance is an Estimator.
val lr = new LogisticRegression()
// Print out the parameters, documentation, and any default values.
println("LogisticRegression parameters:\n" + lr.explainParams() + "\n")
 
// We may set parameters using setter methods.
lr.setMaxIter(10)
  .setRegParam(0.01)
 
// Learn a LogisticRegression model. This uses the parameters stored in lr.
val model1 = lr.fit(training)
// Since model1 is a Model (i.e., a Transformer produced by an Estimator),
// we can view the parameters it used during fit().
// This prints the parameter (name: value) pairs, where names are unique IDs for this
// LogisticRegression instance.
println("Model 1 was fit using parameters: " + model1.parent.extractParamMap)
 
// We may alternatively specify parameters using a ParamMap,
// which supports several methods for specifying parameters.
val paramMap = ParamMap(lr.maxIter -> 20)
  .put(lr.maxIter, 30)  // Specify 1 Param. This overwrites the original maxIter.
  .put(lr.regParam -> 0.1, lr.threshold -> 0.55)  // Specify multiple Params.
 
// One can also combine ParamMaps.
val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability")  // Change output column name.
val paramMapCombined = paramMap ++ paramMap2
 
// Now learn a new model using the paramMapCombined parameters.
// paramMapCombined overrides all parameters set earlier via lr.set* methods.
val model2 = lr.fit(training, paramMapCombined)
println("Model 2 was fit using parameters: " + model2.parent.extractParamMap)
 
// Prepare test data.
val test = spark.createDataFrame(Seq(
  (1.0, Vectors.dense(-1.0, 1.5, 1.3)),
  (0.0, Vectors.dense(3.0, 2.0, -0.1)),
  (1.0, Vectors.dense(0.0, 2.2, -1.5))
)).toDF("label", "features")
 
// Make predictions on test data using the Transformer.transform() method.
// LogisticRegression.transform will only use the 'features' column.
// Note that model2.transform() outputs a 'myProbability' column instead of the usual
// 'probability' column since we renamed the lr.probabilityCol parameter previously.
model2.transform(test)
  .select("features", "label", "myProbability", "prediction")
  .collect()
  .foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>
    println(s"($features, $label) -> prob=$prob, prediction=$prediction")
  }

Java:

import java.util.Arrays;
import java.util.List;
 
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.classification.LogisticRegressionModel;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
 
// Prepare training data.
List<Row> dataTraining = Arrays.asList(
    RowFactory.create(1.0, Vectors.dense(0.0, 1.1, 0.1)),
    RowFactory.create(0.0, Vectors.dense(2.0, 1.0, -1.0)),
    RowFactory.create(0.0, Vectors.dense(2.0, 1.3, 1.0)),
    RowFactory.create(1.0, Vectors.dense(0.0, 1.2, -0.5))
);
StructType schema = new StructType(new StructField[]{
    new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
    new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> training = spark.createDataFrame(dataTraining, schema);
 
// Create a LogisticRegression instance. This instance is an Estimator.
LogisticRegression lr = new LogisticRegression();
// Print out the parameters, documentation, and any default values.
System.out.println("LogisticRegression parameters:\n" + lr.explainParams() + "\n");
 
// We may set parameters using setter methods.
lr.setMaxIter(10).setRegParam(0.01);
 
// Learn a LogisticRegression model. This uses the parameters stored in lr.
LogisticRegressionModel model1 = lr.fit(training);
// Since model1 is a Model (i.e., a Transformer produced by an Estimator),
// we can view the parameters it used during fit().
// This prints the parameter (name: value) pairs, where names are unique IDs for this
// LogisticRegression instance.
System.out.println("Model 1 was fit using parameters: " + model1.parent().extractParamMap());
 
// We may alternatively specify parameters using a ParamMap.
ParamMap paramMap = new ParamMap()
  .put(lr.maxIter().w(20))  // Specify 1 Param.
  .put(lr.maxIter(), 30)  // This overwrites the original maxIter.
  .put(lr.regParam().w(0.1), lr.threshold().w(0.55));  // Specify multiple Params.
 
// One can also combine ParamMaps.
ParamMap paramMap2 = new ParamMap()
  .put(lr.probabilityCol().w("myProbability"));  // Change output column name
ParamMap paramMapCombined = paramMap.$plus$plus(paramMap2);
 
// Now learn a new model using the paramMapCombined parameters.
// paramMapCombined overrides all parameters set earlier via lr.set* methods.
LogisticRegressionModel model2 = lr.fit(training, paramMapCombined);
System.out.println("Model 2 was fit using parameters: " + model2.parent().extractParamMap());
 
// Prepare test documents.
List<Row> dataTest = Arrays.asList(
    RowFactory.create(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
    RowFactory.create(0.0, Vectors.dense(3.0, 2.0, -0.1)),
    RowFactory.create(1.0, Vectors.dense(0.0, 2.2, -1.5))
);
Dataset<Row> test = spark.createDataFrame(dataTest, schema);
 
// Make predictions on test documents using the Transformer.transform() method.
// LogisticRegression.transform will only use the 'features' column.
// Note that model2.transform() outputs a 'myProbability' column instead of the usual
// 'probability' column since we renamed the lr.probabilityCol parameter previously.
Dataset<Row> results = model2.transform(test);
Dataset<Row> rows = results.select("features", "label", "myProbability", "prediction");
for (Row r: rows.collectAsList()) {
  System.out.println("(" + r.get(0) + ", " + r.get(1) + ") -> prob=" + r.get(2)
    + ", prediction=" + r.get(3));
}

Python:

from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression
 
# Prepare training data from a list of (label, features) tuples.
training = spark.createDataFrame([
    (1.0, Vectors.dense([0.0, 1.1, 0.1])),
    (0.0, Vectors.dense([2.0, 1.0, -1.0])),
    (0.0, Vectors.dense([2.0, 1.3, 1.0])),
    (1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"])
 
# Create a LogisticRegression instance. This instance is an Estimator.
lr = LogisticRegression(maxIter=10, regParam=0.01)
# Print out the parameters, documentation, and any default values.
print "LogisticRegression parameters:\n" + lr.explainParams() + "\n"
 
# Learn a LogisticRegression model. This uses the parameters stored in lr.
model1 = lr.fit(training)
 
# Since model1 is a Model (i.e., a transformer produced by an Estimator),
# we can view the parameters it used during fit().
# This prints the parameter (name: value) pairs, where names are unique IDs for this
# LogisticRegression instance.
print "Model 1 was fit using parameters: "
print model1.extractParamMap()
 
# We may alternatively specify parameters using a Python dictionary as a paramMap
paramMap = {lr.maxIter: 20}
paramMap[lr.maxIter] = 30  # Specify 1 Param, overwriting the original maxIter.
paramMap.update({lr.regParam: 0.1, lr.threshold: 0.55})  # Specify multiple Params.
 
# You can combine paramMaps, which are python dictionaries.
paramMap2 = {lr.probabilityCol: "myProbability"}  # Change output column name
paramMapCombined = paramMap.copy()
paramMapCombined.update(paramMap2)
 
# Now learn a new model using the paramMapCombined parameters.
# paramMapCombined overrides all parameters set earlier via lr.set* methods.
model2 = lr.fit(training, paramMapCombined)
print "Model 2 was fit using parameters: "
print model2.extractParamMap()
 
# Prepare test data
test = spark.createDataFrame([
    (1.0, Vectors.dense([-1.0, 1.5, 1.3])),
    (0.0, Vectors.dense([3.0, 2.0, -0.1])),
    (1.0, Vectors.dense([0.0, 2.2, -1.5]))], ["label", "features"])
 
# Make predictions on test data using the Transformer.transform() method.
# LogisticRegression.transform will only use the 'features' column.
# Note that model2.transform() outputs a "myProbability" column instead of the usual
# 'probability' column since we renamed the lr.probabilityCol parameter previously.
prediction = model2.transform(test)
selected = prediction.select("features", "label", "myProbability", "prediction")
for row in selected.collect():
    print row

管道示例:
Scala:

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
 
// Prepare training documents from a list of (id, text, label) tuples.
val training = spark.createDataFrame(Seq(
  (0L, "a b c d e spark", 1.0),
  (1L, "b d", 0.0),
  (2L, "spark f g h", 1.0),
  (3L, "hadoop mapreduce", 0.0)
)).toDF("id", "text", "label")
 
// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
val tokenizer = new Tokenizer()
  .setInputCol("text")
  .setOutputCol("words")
val hashingTF = new HashingTF()
  .setNumFeatures(1000)
  .setInputCol(tokenizer.getOutputCol)
  .setOutputCol("features")
val lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.01)
val pipeline = new Pipeline()
  .setStages(Array(tokenizer, hashingTF, lr))
 
// Fit the pipeline to training documents.
val model = pipeline.fit(training)
 
// Now we can optionally save the fitted pipeline to disk
model.write.overwrite().save("/tmp/spark-logistic-regression-model")
 
// We can also save this unfit pipeline to disk
pipeline.write.overwrite().save("/tmp/unfit-lr-model")
 
// And load it back in during production
val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")
 
// Prepare test documents, which are unlabeled (id, text) tuples.
val test = spark.createDataFrame(Seq(
  (4L, "spark i j k"),
  (5L, "l m n"),
  (6L, "mapreduce spark"),
  (7L, "apache hadoop")
)).toDF("id", "text")
 
// Make predictions on test documents.
model.transform(test)
  .select("id", "text", "probability", "prediction")
  .collect()
  .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
    println(s"($id, $text) --> prob=$prob, prediction=$prediction")
  }

Java:

import java.util.Arrays;
 
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
 
// Prepare training documents, which are labeled.
Dataset<Row> training = spark.createDataFrame(Arrays.asList(
  new JavaLabeledDocument(0L, "a b c d e spark", 1.0),
  new JavaLabeledDocument(1L, "b d", 0.0),
  new JavaLabeledDocument(2L, "spark f g h", 1.0),
  new JavaLabeledDocument(3L, "hadoop mapreduce", 0.0)
), JavaLabeledDocument.class);
 
// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
Tokenizer tokenizer = new Tokenizer()
  .setInputCol("text")
  .setOutputCol("words");
HashingTF hashingTF = new HashingTF()
  .setNumFeatures(1000)
  .setInputCol(tokenizer.getOutputCol())
  .setOutputCol("features");
LogisticRegression lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.01);
Pipeline pipeline = new Pipeline()
  .setStages(new PipelineStage[] {tokenizer, hashingTF, lr});
 
// Fit the pipeline to training documents.
PipelineModel model = pipeline.fit(training);
 
// Prepare test documents, which are unlabeled.
Dataset<Row> test = spark.createDataFrame(Arrays.asList(
  new JavaDocument(4L, "spark i j k"),
  new JavaDocument(5L, "l m n"),
  new JavaDocument(6L, "mapreduce spark"),
  new JavaDocument(7L, "apache hadoop")
), JavaDocument.class);
 
// Make predictions on test documents.
Dataset<Row> predictions = model.transform(test);
for (Row r : predictions.select("id", "text", "probability", "prediction").collectAsList()) {
  System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> prob=" + r.get(2)
    + ", prediction=" + r.get(3));
}

Python:

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
 
# Prepare training documents from a list of (id, text, label) tuples.
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)], ["id", "text", "label"])
 
# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
 
# Fit the pipeline to training documents.
model = pipeline.fit(training)
 
# Prepare test documents, which are unlabeled (id, text) tuples.
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "mapreduce spark"),
    (7, "apache hadoop")], ["id", "text"])
 
# Make predictions on test documents and print columns of interest.
prediction = model.transform(test)
selected = prediction.select("id", "text", "prediction")
for row in selected.collect():
    print(row)

转自https://blog.csdn.net/liulingyuan6/article/details/53576550

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,793评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,567评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,342评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,825评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,814评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,680评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,033评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,687评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,175评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,668评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,775评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,419评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,020评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,206评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,092评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,510评论 2 343

推荐阅读更多精彩内容