第六章 Flink中的时间和窗口

时间语义


上图是数据流式处理过程,涉及到两个重要的时间点:事件时间(Event Time)和处理时间(Processing Time)。

  • 事件时间(Event Time):即数据产生的时间;
  • 处理时间(Processing Time):即数据真正被处理的时刻;

我们在处理数据时,以哪种时间作为衡量标准,就是所谓的时间语义问题(Notions of Time)。由于分布式系统中网络传输的延迟和时钟漂移,处理时间相对事件发生的时间会有滞后。在这种情况下,就不能简单地把数据自带的时间戳当作时钟了,而需要用另外的标志来表示事件时间进展,在 Flink 中把它叫作事件时间的“水位线”(Watermarks)。

水位线(Watermark)

我们把时钟也数据的形式传递出去,告诉下游任务当前时间的进展;而且这个时钟的传递不会因为窗口聚合之类的运算而停滞。一种简单的想法是,在数据流中加入一个时钟标记,记录当前的事件时间;这个标记可以直接到广播下游,当下游任务收到这个标记,就可以更新自己的时钟了。由于类似流水中用来当做标志的几号,在Flink中,这种用来衡量事件时间(Event Time)进展的标记,就被称作水位线(Watermark)。

水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而他插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。


理想的水位线是有序的,但是现实中由于不可控因素常常会有少量乱序的数据。




周期性生成时间戳,保存区间最大值

水位线代表当前事件时间时钟,而且可以在数据的时间戳基础上加一些延迟来保证不丢失数据,这一点对于乱序流的正确处理非常重要。水位线的特性:

  • 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据;
  • 水位线主要的内容是一个时间戳,用来表示当前事件时间的进展;
  • 水位显示基于数据的时间戳生成的;
  • 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进;
  • 水位线可以通过设置延迟,以保证正确处理乱序数据;
  • 一个水位线(Watermark)t表示在当前流中事件时间已经到达了时间戳t,这导表t之前的所有数据都到齐了,之后流中不会出现时间戳t'<t的数据;

水位线是Flink流处理中保证结果正确性的核心机制,它往往会跟窗口一起配合,完成对乱序数据的正确处理。

如何生成水位线

在生成水位线的时候,如果希望计算结果更准确,可以将水位线延迟设置得更高一些,等待时间越长,越不容易漏掉数据,但是这样时效性降低了。而如果将等待时间设置过短则会遗漏掉部分数据,虽然Flink提供了处理迟到数据的方法,但是需要分开处理。因此如何设置延迟是一个需要根据实际情况权衡的问题。

在Flink的DataStream API中,有一个单独用于生成水位线的方法:assignTimestampAndWatermarks(),他主要用来为流中的数据分配时间戳,并生成水位线来显示时间。该方法需要传入一个WatermarkStrategy作为参数,WatermarkStrategy 中包含了一个“时间戳分配器”TimestampAssigner和一个“水位线生成器”WatermarkGenerator:

  • TimestampAssigner, 主要负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础;
  • WatermarkGenerator, 主要负责按照既定的方式,基于时间戳生成水位线。在WatermarkGenerator中主要有两个方法:onEvent和onPeriodicEmit;
    • onEvent, 每个事件(数据)到来都会调用的方法,他的参数有当前事件、时间戳,以及允许发出水位线的一个WatermarkOutput,可以基于事件做各种操作;
    • onPeriodiEmit, 周期性调用的方法,可以由WatermarkOutput发出水位线。周期时间为处理时间,可以调用环境配置的setAutoWatermarkInterval()方法来设置,默认为200ms;
代码

Flink提供了内置的水位线生成器:

  • 有序流,对于有序流,主要特点就是时间戳单调增长(Monotonously Increasing Timestamps),所以永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景,直接调用WatermarkStrategy.forMonotonousTimestamps()方法就可以实现。简单来说,就是直接拿当前最大的时间戳作为水位线就可以了。
  • 乱序流,由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的时间延迟(Fixed Amount of Lateness)。这时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟的结果,相当于把表调慢,当前时钟会滞后于数据的最大时间戳。调用 WatermarkStrategy. forBoundedOutOfOrderness()方法就可以实现。这个方法需要传入一个 maxOutOfOrderness 参数,表示“最大乱序程度”,它表示数据流中乱序数据时间戳的最大差值;如果我们能确定乱序程度,那么设置对应时间长度的延迟,就可以等到所有的乱序数据了。
代码
自定义水位线

在WatermarkStrategy中,时间戳分配器TimestampAssigner都是大同小异的,指定字段提取时间戳就可以了。不同策略的关键在于WatermarkGenerator的实现。整体来说,Flink有两种不同的生产水位线的方式:一种是周期性的(Periodic),另一种是断点式的(Punctuated):

  • 周期性水位线生成器(Periodic Generator),周期性生成器一般是通过 onEvent()观察判断输入的事件,而在 onPeriodicEmit()里发出水位线;
  • 断点式水位线生成器(Punctuated Generator),断点式生成器会不停地检测 onEvent()中的事件,当发现带有水位线信息的特殊事件时,就立即发出水位线。一般来说,断点式生成器不会通过 onPeriodicEmit()发出水位线。

此外,也可以在自定义数据源中发送水位线,但是这样就不能使用assignTimestampsAndWatermarks 方法来生成水位线了,两者只能二选一。

水位线的传递

在“重分区”(redistributing)的传输模式下,一个任务有可能会收到来自不同分区上游子任务的数据。而不同分区的子任务时钟并不同步,所以同一时刻发给下游任务的水位线可能并不相同。这说明上游各个分区处理得有快有慢,进度各不相同,这时我们应该以最慢的那个时钟,也就是最小的那个水位线为准。

窗口(Window)

Flink是一种流式计算引擎,主要是用来处理无界数据流的。想要更加方便的处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(window)。在Flink中,窗口就是用来处理无界流的核心。

由于存在迟到数据的问题,将窗口视为一个框可能并不是最合适的。我们可以把它理解成一个“桶”(bucket):每个数据都会分发到对应的桶中,当到达窗口的结束时间时,就对每个桶中收集的数据进行计算处理。


窗口的分类

  • 按照驱动类型分类:
    • 1)时间窗口,时间窗口以时间点来定义窗口的开始(start)和结束(end),所以截取出的就是某一时间段的数据;
    • 2)计数窗口,计数窗口基于元素的个数来截取数据,到达固定的个数时就触发计算并关闭窗口。每个窗口截取数据的个数,就是窗口的大小。
  • 按照窗口分配数据的规则分类:1)滚动窗口,2)滑动窗口,3)会话窗口,4)全局窗口;





窗口API概览

在定义窗口操作之前,需要先确定到底是基于按键分区的数据流KeyedStream还是在没有按键分区的DataStream上面开窗。也即调用窗口算子之前是否有keyBy操作。

而在API上面的区别也是非常小:

// 按键分区
stream.keyBy(...).window(...)

// 非按键分区
stream.windowAll(...)

窗口分配器(Window Assigner)

定义窗口分配器(Window Assigner)是构建窗口算子的第一步,他的作用就是定义数据应该被分配到哪个窗口。通过向上一节中的window/windowAll函数中传入WindowAssigner参数,返回WindowStream。

不同窗口类型有不同的窗口分配器。

1、时间窗口
// 滚动处理时间窗口
stream.keyBy(...)
  .window(TumblingProcessingTimeWindows.of(Time.seconds(5))
  .aggregate(...)

// 滑动处理时间窗口
stream.keyBy(...)
  .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))
  .aggregate(...)

// 处理时间会话窗口
stream.keyBy(...)
  .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
  .aggregate(...)

// 滚动事件时间窗口
stream.keyBy(...)
  .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  .aggregate(...)

// 滑动事件时间窗口
stream.keyBy(...)
  .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
  .aggregate(...)

// 事件时间会话窗口
stream.keyBy(...)
  .window(EventTimeSessionWindows.WithGap(Time.seconds(10)))
  .aggregate(...)

// 2、计数窗口
// 滚动计数窗口, 定义一个长度为10的滚动计数窗口
stream.keyBy(...)
  .countWindow(10)

// 滑动计数窗口,长度为10,步长为3
stream.keyBy(...)
  .countWindow(10, 3)

// 3、全局窗口, 全局窗口必须自行定义触发器才能实现窗口计算,否则起不到任何作用
stream.keyBy(...)
  .window(GlobalWindows.create())

窗口函数(Window Functions)

在上面定义了窗口分配器,我们只是知道了数据属于哪个窗口,而本节介绍的窗口函数则是如何将这些窗口中的数据收集起来,即如何处理。

窗口函数是作用在windowStream上面的,返回的是DataStream。各种stream间的转换如下:


1、增量聚合函数
为了提高实时性,我们可以像 DataStream 的简单聚合一样,每来一条数据就立即进行计算,中间只要保持一个简单的聚合状态就可以了;区别只是在于不立即输出结果,而是要等到窗口结束时间。等到窗口到了结束时间需要输出计算结果的时候,我们只需要拿出之前聚合的状态直接输出,这无疑就大大提高了程序运行的效率和实时性。

典型的增量聚合函数有两个:ReduceFunction和AggregateFunction。

ReduceFunction:

package com.whu.chapter06

import com.whu.chapter05.{ClickSource, Event}

import java.time.Duration
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows

object WindowFunctionDemo {
  def main(args:Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    env.addSource(new ClickSource())
      // 数据源中的时间戳是单调递增的,所以使用下面的方法,只需要抽取时间戳就好了
      // 等同于最大延迟时间是0毫秒
      .assignAscendingTimestamps(_.timeStamp)
      .map(r => (r.user, 1L))
      // 使用用户名对数据流进行分组
      .keyBy(_._1)
      // 设置5秒钟的滚动事件时间窗口
      .window(TumblingEventTimeWindows.of(Time.seconds(5)))
      // 保留第一个字段,针对第二个字段进行聚合
      .reduce((r1, r2) => (r1._1, r1._2+r2._2))
      .print()
    
    env.execute()
  }
}

AggregateFunction
ReduceFunction 可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。

AggregateFunction 在源码中的定义如下:

public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable 
{
  // 创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次
  ACC createAccumulator();

  // 将输入的元素添加到累加器中。这就是基于聚合状态,对新来的数据进行进一步聚合的过程
  ACC add(IN value, ACC accumulator);

  // getResult():从累加器中提取聚合的输出结果。也就是说,我们可以定义多个状态,然后再基于这些聚合的状态计算出一个结果进行输出
  OUT getResult(ACC accumulator);

  // 合并两个累加器,并将合并后的状态作为一个累加器返回。这个方法只在需要合并窗口的场景下才会被调用
  ACC merge(ACC a, ACC b);
}

AggregateFunction接受3个数据类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。输入类型 IN 就是输入流中元素的数据类型;累加器类型 ACC 则是我们进行聚合的中间状态类型;而输出类型当然就是最终计算结果的类型了。

    env.addSource(new ClickSource())
      .assignAscendingTimestamps(_.timeStamp)
      // 通过为每条数据分配相同的key,来将数据发送到同一个分区
      .keyBy(_ => "key")
      .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(2)))
      .aggregate(new AvgPv)

    env.execute()
  
  class AvgPv extends AggregateFunction[Event,(Set[String], Double), Double] {
    // 创建空累加器,类型是元组,元组的第一个元素类型为Set数据结构,用来对用户名去重
    // 第二个元素用来累加pv操作,也就是没来一条数据就加一
    override def createAccumulator(): (Set[String], Double) = (Set[String](), 0L)

    // 累加规则
    override def add(in: Event, acc: (Set[String], Double)): (Set[String], Double) = {
      (acc._1+in.user, acc._2+1)
    }

    // 获取窗口关闭时向下游发送的结果
    override def getResult(acc: (Set[String], Double)): Double = {
      acc._2/(acc._1.size.toDouble)
    }

    // merge方法只有在事件时间的会话窗口时,才需要实现,这里无需实现
    override def merge(acc: (Set[String], Double), acc1: (Set[String], Double)): (Set[String], Double) = ???
  }

全窗口函数(Full Window Functions)
窗口操作中的另一大类就是全窗口函数,与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。

Flink中的全窗口函数有两种:WindowFunction和ProcessWindowFunction。

// 窗口函数
stream.keyBy(<key selector>)
  .window(<window assigner>)
  .apply(new MyWindowFunction())

处理窗口函数ProcessWindowFunction是Window API中最底层的通用窗口函数接口。除了可以拿到窗口中的所有数据之外,ProcessWindowFunction还可以获取到一个“上下文对象(context)”。这个上下文对象不仅有窗口信息,还可以访问当前的时间和状态信息。这里的时间包括了处理时间(process time)和事件时间水位线(event time watermark)。这使得ProcessWindowFunction更加灵活、功能更加丰富,可以认为是一个增强版的WindowFunction。

// Full WindowFunction
    env.addSource(new ClickSource())
      .assignAscendingTimestamps(_.timeStamp)
      // 为所有数据都指定同一个key,可以将所有数据发送到同一个分区
      .keyBy(_ => "key")
      .window(TumblingEventTimeWindows.of(Time.seconds(10)))
      .process(new UvCountByWindow)
      .print()
    
    env.execute()
  
  // 自定义窗口处理函数
  class UvCountByWindow extends ProcessWindowFunction[Event, String, String, TimeWindow]{
    // 
    override def process(key: String, context: Context, elements: Iterable[Event], out: Collector[String]): Unit = {
      // 初始化一个Set数据结构,用来对用户名进行去重
      var userSet = Set[String]()
      // 将所有用户进行去重
      elements.foreach(userSet += _.user)
      // 结合窗口信息,包装输出内容
      val windowStart = context.window.getStart
      val windowEnd = context.window.getEnd
      out.collect(" 窗口:"+ new Timestamp(windowStart) + " ~ "+ new Timestamp(windowEnd) + " 独立访客数为:" + userSet.size)
    }
增量和聚合函数结合使用

增量聚合相当于把计算量“均摊”到了窗口收集数据的过程中,自然就会比全窗口聚合更加高效、输出更加实时。而全窗口函数的优势在于提供了更多的信息,可以认为是更加“通用”的窗口操作,窗口计算更加灵活,功能更加强大。所以在实际应用中,我们往往希望兼具这两者的优点,把它们结合在一起使用。Flink 的Window API 就给我们实现了这样的用法。

这样调用的处理机制是:基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数
据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输出结果。

    // 全窗口函数和聚合函数结合使用
    env.addSource(new ClickSource())
      .assignAscendingTimestamps(_.timeStamp)
      // 使用url作为key对数据进行分区
      .keyBy(_.url)
      .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(2)))
      // 注意这里调用的是aggregate方法
      // 增量聚合函数和全窗口聚合函数结合使用
      .aggregate(new UrlViewCountAgg, new UrlViewCountResult)
      .print()

  class UrlViewCountAgg extends AggregateFunction[Event, Long, Long] {
    override def createAccumulator(): Long = 0L

    // 每来一个事件就加1
    override def add(in: Event, acc: Long): Long = acc + 1L

    // 窗口闭合时发送的计算结果
    override def getResult(acc: Long): Long = acc

    override def merge(acc: Long, acc1: Long): Long = ???
  }

  case class UrlViewCount(url: String, count: Long, windowStart: Long, windowEnd: Long)

  class UrlViewCountResult extends ProcessWindowFunction[Long, UrlViewCount, String, TimeWindow] {
    // 迭代器中只有一个元素,是增量聚合函数在窗口闭合时发送过来的计算结果
    override def process(key: String, context: Context, elements: Iterable[Long], out: Collector[UrlViewCount]): Unit = {
      out.collect(UrlViewCount(key, elements.iterator.next(), context.window.getStart, context.window.getEnd))
    }
  }

其它API

  • 触发器(Trigger):触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”,本质上就是执行窗口函数,所以可以认为是计算得到结果并输出的过程;Trigger 是窗口算子的内部属性,每个窗口分配器(WindowAssigner)都会对应一个默认的触发器;对于 Flink 内置的窗口类型,它们的触发器都已经做了实现;
  • 移除器(Evictor):移除器主要用来定义移除某些数据的逻辑;
  • 允许延迟(Allowed Lateness):为了解决迟到数据的问题,Flink 提供了一个特殊的接口,可以为窗口算子设置一个“允许的最大延迟”(Allowed Lateness)。也就是说,我们可以设定允许延迟一段时间,在这段时间内,窗口不会销毁,继续到来的数据依然可以进入窗口中并触发计算;
  • 将迟到的数据放入侧输出流:Flink 还提供了另外一种方式处理迟到数据。我们可以将未收入窗口的迟到数据,放入“侧输出流”(side output)进行另外的处理。所谓的侧输出流,相当于是数据流的一个“分支”,这个流中单独放置那些本该被丢弃的数据。

窗口的生命周期

熟悉了窗口 API 的使用,这里梳理一下窗口本身的生命周期,这也是对窗口所有操作的一个总结:

  • 窗口的创建;
  • 窗口计算的触发;
  • 窗口的销毁;


迟到数据的处理

所谓的“迟到数据”(late data),是指某个水位线之后到来的数据,它的时间戳其实是在水位线之前的。所以只有在事件时间语义下,讨论迟到数据的处理才是有意义的。

  • 设置水位线延迟时间;
  • 允许窗口处理迟到数据;
  • 将迟到数据放入窗口侧输出流;
package com.whu.chapter06

import com.whu.chapter05.{ClickSource, Event}
import com.whu.chapter06.WindowFunctionDemo.{UrlViewCountAgg, UrlViewCountResult}

import java.time.Duration
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.{SlidingEventTimeWindows, TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import java.sql.Timestamp

object ProcessLateDataDemo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    
    // 为方便测试,读取socket文本流进行处理
    val stream = env.socketTextStream("localhost", 7777)
      .map(data => {
        val fields = data.split(",")
        Event(fields(0).trim, fields(1).trim, fields(2).trim.toLong)
      })
    
    // 方式1:设置Watermark延迟时间 2秒钟
    val res1 = stream.assignTimestampsAndWatermarks(WatermarkStrategy
      // 最大延迟时间设置为5秒钟
      .forBoundedOutOfOrderness[Event](Duration.ofSeconds(2))
      .withTimestampAssigner( new SerializableTimestampAssigner[Event] {
        override def extractTimestamp(t: Event, l: Long): Long = t.timeStamp
      })
    )
    
    // 定义侧输出流标签
    val outputTag = OutputTag[Event]("late")
    val res2 = stream
      .keyBy(_.url)
      .window(TumblingEventTimeWindows.of(Time.seconds(10)))
      // 方式二:允许窗口处理迟到数据,设置1分钟的等待时间
      .allowedLateness(Time.minutes(1))
      // 方式三:将最后的迟到数据输出到侧输出流
      .sideOutputLateData(outputTag)
      .aggregate(new UrlViewCountAgg, new UrlViewCountResult)
    
    res2.print()
    
    res2.getSideOutput(outputTag).print("late")
    
    // 为方便观察,可以将原始数据也输出
    stream.print("input")
  }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,126评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,254评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,445评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,185评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,178评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,970评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,276评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,927评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,400评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,883评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,997评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,646评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,213评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,204评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,423评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,423评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,722评论 2 345

推荐阅读更多精彩内容