介绍
许多应用需要处理及时收到的数据,Spark Streaming是Spark为这些应用而设计的模型。它允许用户使用一套和批处理非常接近的 API 来编写流式计算应用,这样就可以大量重用批处理应用的技术甚至代码。
和Spark基于RDD的概念很相似,Spark Streaming使用离散化流(discretized stream)作为抽象表示,叫作 DStream。DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而 DStream 是由这些 RDD 所组成的序列(因此得名“离散化”)。DStream 可以从各种输入源创建,比如 Flume、Kafka 或者 HDFS。创建出来的 DStream 支持两种操作,一种是转化操作(transformation),会生成一个新的DStream,另一种是输出操作(output operation),可以把数据写入外部系统中。DStream提供了许多与 RDD 所支持的操作相类似的操作支持,还增加了与时间相关的新操作,比如滑动窗口。
转化操作
DStream 的转化操作可以分为无状态(stateless)和有状态(stateful)两种。
• 在无状态转化操作中,每个批次的处理不依赖于之前批次的数据。第3章和第4章中所讲的常见的 RDD 转化操作,例如 map()、filter()、reduceByKey() 等,都是无状态转化操作。
• 相对地,有状态转化操作需要使用之前批次的数据或者是中间结果来计算当前批次的数据。有状态转化操作包括基于滑动窗口的转化操作和追踪状态变化的转化操作。
无状态转换的例子:
- map
- flatmap
- filter
- repartition
- reducebykey
- groupbykey
无状态转化操作也能在多个 DStream 间整合数据,不过也是在各个时间区间内。例如,键值对 DStream 拥有和 RDD 一样的与连接相关的转化操作,也就cogroup()、join()、leftOuterJoin() 等
Transform(): 可以让你直接操作其内部的rdd。
val outlierDStream = accessLogsDStream.transform { rdd =>
extractOutliers(rdd)
}
有状态转换
DStream 的有状态转化操作是跨时间区间跟踪数据的操作;也就是说,一些先前批次的数据也被用来在新的批次中计算结果。主要的两种类型是滑动窗口和 updateStateByKey(),前者以一个时间阶段为滑动窗口进行操作,后者则用来跟踪每个键的状态变化(例如构建一个代表用户会话的对象)。
基于窗口的转化操作
所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长,两者都必须是StreamContext 的批次间隔的整数倍。窗口时长控制每次计算最近的多少个批次的数据,其实就是最近的 windowDuration/batchInterval 个批次。如果有一个以 10 秒为批次间隔的源DStream,要创建一个最近 30 秒的时间窗口(即最近 3 个批次),就应当把 windowDuration设为 30 秒。而滑动步长的默认值与批次间隔相等,用来控制对新的 DStream 进行计算的间隔。如果源 DStream 批次间隔为 10 秒,并且我们只希望每两个批次计算一次窗口结果,就应该把滑动步长设置为 20 秒。
val el=errerlines.window(Seconds(3), Seconds(3))
el.count()
尽管可以使用window()写出所有的窗口操作,Spark Streaming还是提供了一些其他的窗口操作,让用户可以高效而方便地使用。首先,reduceByWindow() 和 reduceByKeyAndWindow()让我们可以对每个窗口更高效地进行归约操作。
输出操作
输出操作指定了对流数据经转化操作得到的数据所要执行的操作(例如把结果推入外部数据库或输出到屏幕上)。
print
ipAddressRequestCount.saveAsTextFiles("outputDir", "txt")
saveashadoopfiles
-
Sequence file
val writableIpAddressRequestCount = ipAddressRequestCount.map { (ip, count) => (new Text(ip), new LongWritable(count)) } writableIpAddressRequestCount.saveAsHadoopFiles[ SequenceFileOutputFormat[Text, LongWritable]]("outputDir", "txt")
foreachrdd
24/7不间断运行
检查点机制
检查点机制是我们在Spark Streaming中用来保障容错性的主要机制。它可以使SparkStreaming阶段性地把应用数据存储到诸如HDFS或Amazon S3这样的可靠存储系统中,以供恢复时使用。具体来说,检查点机制主要为以下两个目的服务。
- 控制发生失败时需要重算的状态数。SparkStreaming可以通过转化图的谱系图来重算状态,检查点机制则可以控制需要在转化图中回溯多远。
- 提供驱动器程序容错。如果流计算应用中的驱动器程序崩溃了,你可以重启驱动器程序并让驱动器程序从检查点恢复,这样Spark Streaming就可以读取之前运行的程序处理数据的进度,并从那里继续。
驱动器程序容错
驱动器程序的容错要求我们以特殊的方式创建 StreamingContext。我们需要把检查点目录提供给 StreamingContext。与直接调用 new StreamingContext 不同,应该使用StreamingContext.getOrCreate() 函数。
性能考量
批次和窗口大小(500ms)
并行度
• 增加接收器数目有时如果记录太多导致单台机器来不及读入并分发的话,接收器会成为系统瓶颈。这时你就需要通过创建多个输入 DStream(这样会创建多个接收器)来增加接收器数目,然后使用 union 来把数据合并为一个数据源。
• 将收到的数据显式地重新分区
如果接收器数目无法再增加,你可以通过使用 DStream.repartition 来显式重新分区输
入流(或者合并多个流得到的数据流)来重新分配收到的数据。
• 提高聚合计算的并行度
对于像 reduceByKey() 这样的操作,你可以在第二个参数中指定并行度。