缓存/持久化
和RDD类似,DStream允许开发者将流数据持久化到内存。使用在DStream上使用persist()
方法会自动持久化DStream中的每个EDD到内存中。这对于DStream需要计算多次的情况非常有用(如在相同数据上进行多个操作)。对于window-based操作(如reduceByWindow
和reduceByKeyAndWindow
)和state-based操作(如updateStateByKey
),会隐式地进行持久化,因此,通过window-based操作生成的DStream会自动持久化到内存,不需要开发者调用persist()
。
对于通过网络接收的输入数据流(如Kafka,Flume,socket等),默认持久化级别设置为在两个节点中保存副本,以便进行容错处理。
注意,不像RDD,DStream的偶人持久化级别会将数据序列化保存到内存中。在之后性能调优中会进一步讨论,更多关于持久化级别的信息参见Spark编程指南(二)。
检查点
streaming应用程序必须7*24小时运行,因此必须对于应用程序逻辑无关的错误具有弹性(如系统错误,JVM崩溃等)。Spark Streaming需要将足够的检查点信息放到容错存储系统,以便之后从错误中恢复,有两种类型的数据需要设置检查点。
-
元数据检查点-将定义streaming计算的信息存储到容错存储系统中,如HDFS。用于从运行streaming驱动程序的节点错误中恢复,元数据包括:
- 配置-用于创建streaming应用程序的配置。
- DStream操作-定义streaming应用程序的DStream操作集合。
- 未完成的批次-批次的作业在队列中但是还没计算完成。
数据检查点-保存生成的RDD到可靠的存储系统中,对于一些带状态的转换(跨多个批次合并数据)是必要的。在这样的转换中,生成的RDD要依赖之前批次的RDD,导致依赖链的长度会随着时间增长。为避免这种无限增长,带状态转换的中间RDD会周期性地在可靠存储系统中保存检查点(如HDFS)来切断依赖链。
总结来说,元数据检查点用于从驱动程序的错误中恢复,如果使用了带状态的转换,数据或RDD检查点是必要的。
何时启用检查点
以下几种情况必须为应用程序启用检查点:
- 使用了带状态的转换-如果在应用程序中使用了
updateStateByKey
或reduceByKeyAndWindow
,就必须提供检查点目录用于周期性地存储RDD检查点。 - 从驱动程序的错误中恢复-检查点元数据用于恢复进度信息。
注意,没有使用带状态转换的简单streaming应用程序可以不启用检查点。在这种情况下,从驱动程序错误中恢复也是局部的(一些已经接收但是没有处理的数据可能会丢失)。这通常是可接受的,很多Spark Streaming应用程序用这种方式运行。对非Hadoop环境的支持会在未来进行改善。
如何配置检查点
启用检查点,需要设置一个容错可靠的文件系统(如HDFS,S3等)中的目录,用于存储检查点信息。使用streamingContext.checkpoint(checkpointDirectory)
进行设置。这样就可以使用带状态的转换了。另外,如果想要让应用程序从驱动程序的错误中恢复,需要重写streaming应用程序包含以下行为。
- 当程序第一次启动时,会创建一个新的StreamingContext,然后调用start()。
- 当程序在错误后重新启动时,会从检查点数据中重新创建StreamingContext。
这些行为可使用StreamingContext.getOrCreate
完成,如下。
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
...
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
}
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...
// Start the context
context.start()
context.awaitTermination()
如果checkpointDirectory
存在,那么上下文会通过检查见数据重新创建。如果目录不存在(如第一次运行),那么函数functionToCreateContext
会创建新的上下文和DStream。可以参考示例RecoverableNetworkWordCount。这个示例程序会像一个文件追加网络数据的单词计数。
除了使用getOrCreate
之外,还需要保证驱动程序在出现错误时会自动重新启动。这只能通过应用程序的部署方式来完成。之后会进一步讨论。
注意,RDD的检查点会带来存储到可靠存储系统的成本,这会导致需要RDD检查点的批次处理时间增加。因此,检查点的时间间隔需要小心设置。对于比较小的批次,每个批次的检查点可能会明显减少操作的吞吐量。相反,检查点太少会导致任务规模的增加,带来不利影响。对于需要RDD检查点的带状态转换,默认时间间隔是批时间间隔的倍数,不少于10s。可通过使用dstream.checkpoint(checkpointInterval)
进行设置。通常,检查点时间间隔为5-10个DStream时间间隔是比较好的。
累加器,广播变量和检查点
累加器和广播变量不能从Spark Streaming的检查点中恢复。如果启用了检查点并且同时使用了累加器或广播变量,必须为累加器或广播变量创建懒实例化的单例,以便在驱动程序从失败中重新启动后可以重新实例化它们。下面是个示例。
object WordBlacklist {
@volatile private var instance: Broadcast[Seq[String]] = null
def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
if (instance == null) {
synchronized {
if (instance == null) {
val wordBlacklist = Seq("a", "b", "c")
instance = sc.broadcast(wordBlacklist)
}
}
}
instance
}
}
object DroppedWordsCounter {
@volatile private var instance: LongAccumulator = null
def getInstance(sc: SparkContext): LongAccumulator = {
if (instance == null) {
synchronized {
if (instance == null) {
instance = sc.longAccumulator("WordsInBlacklistCounter")
}
}
}
instance
}
}
wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
// Get or register the blacklist Broadcast
val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
// Get or register the droppedWordsCounter Accumulator
val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
// Use blacklist to drop words and use droppedWordsCounter to count them
val counts = rdd.filter { case (word, count) =>
if (blacklist.value.contains(word)) {
droppedWordsCounter.add(count)
false
} else {
true
}
}.collect().mkString("[", ", ", "]")
val output = "Counts at time " + time + " " + counts
})
完整代码参见source code。
部署应用程序
这一节讨论部署Spark Streaming应用程序的步骤。
要求
要运行Spark Streaming应用程序,需要以下几点。
- 有集群管理器的集群 - 这是任何Spark应用程序的通用要求,具体参见deployment guide。
- 打应用程序JAR包 - 必须将streaming应用程序编译成JAR包。如果使用spark-submit
启动应用程序,那就不需要再JAR报中提供Spark和Spark Streaming。但是,如果使用高级源(如Kafka,Flume),那就必须将额外的库及其引用打到JAR包中。例如,使用KafkaUtils
的应用程序必须将spark-streaming-kafka-0-8_2.11
及其传递依赖打到JAR包中。 - 为executor配置足够的内存 - 因为接收的数据必须存储在内存中,所以executor必须配置足够的内存。注意如果正在执行10分钟的window operations,系统必须在内存中保持至少10分钟的数据。应用程序的内存要求是根据其使用的操作决定的。
- 配置检查点 - 如果streaming应用程序需要,则需要配置检查点目录,是Hadoop API兼容的容错存储目录(如HDFS,S3等),编写streaming应用程序需要使用检查点信息进行错误恢复。具体参见之前检查点相关内容。
- 配置应用程序驱动自动重启 - 为了自动从驱动程序错误中恢复,部署streaming应用程序必须监控驱动进程并且如果出现错误要重启。不同的集群管理器有不同的工具实现这个机制。
- Spark Standalone - Spark程序可以在Spark Standalone集群中运行(参见cluster deploy mode),这就是说,应用驱动程序在一个worker节点上运行。因此,Standalone集群管理器可用于监控驱动程序,并且在驱动程序因为non-zero退出代码或节点错误出错时进行重启。
- YARN - Yarn支持自动重启应用程序的类似机制。请参见YARN文档。
- Mesos - 在Mesos中Marathon用于实现这个机制。
- 配置写ahead日志 - 从Spark 1.2开始,引入了写ahead日志以实现强大的容错保证,如果启用,所有从receiver接收到的数据都会写入ahead日志中存储在配置好的检查点目录中。避免了驱动程序恢复时数据丢失的问题,可以做到零数据丢失。可以通过设置
spark.streaming.receiver.writeAheadLog.enable
为true
来启用。但是,可能会以降低单个receiver的吞吐量为代价。可通过并行运行多个receiver来增加吞吐量。此外,当写ahead日志时,推荐将接收数据的副本关闭,因为日志已经存储到复制存储系统中了。可以通过设置输入流的存储等级为StorageLevel.MEMORY_AND_DISK_SER
来实现。当使用S3(或者任何不支持flushing的文件系统)写ahead日志时,请家住启用spark.streaming.driver.writeAheadLog.closeFileAfterWrite
和spark.streaming.receiver.writeAheadLog.closeFileAfterWrite
。具体参见Spark Streaming Configuration。注意,在启用I/O加密时,Spark不会加密写入ahead日志中的数据。如果想要加密ahead日志中的数据,应该写入原生支持加密的文件系统。 - 设置最大接收速率 - 如果集群资源对应streaming应用程序尽可能快递处理数据不够的话,receiver可以进行限速,设置最大接收速率限制records / sec。参见configuration parameters中的receiver参数
spark.streaming.receiver.maxRate
和Kafka参数spark.streaming.kafka.maxRatePerPartition
。在Spark 1.5中,介绍了一个叫做backpressure的特性,消除设置此速率限制的必要性,Spark Streaming会自动计算速率限制,并在处理条件改变时动态适配。可以通过设置spark.streaming.backpressure.enabled
为true
来启用backpressure。
升级应用程序代码
如果正在运行的Spark Streaming应用程序需要升级到新的应用程序代码,有两种可行的机制。
- 升级后的Spark Streaming应用程序启用与现有应用程序并行运行。一旦新应用程序(与旧应用程序接收同样的数据)已经预热好并且运行良好即可将旧的应用程序停掉。注意,这可用于支持方到两个目的地(分别发给新旧应用程序)的数据源。
- 现有应用程序正在优雅地关闭(参见StreamingContext.stop(...)
或JavaStreamingContext.stop(...)
优雅关闭选项),确保已接收的数据在关机前已经完全处理好。然后升级的应用程序可以启动,将从之前应用程序关闭的同一点开始处理。注意,只有输入源支持源端缓冲(如Kafka和Flume),数据才能在之前的应用程序已经关闭但是升级后的应用成还没启动时进行缓冲。
监控应用程序
除了Spark的监控功能,Spark Streaming还有特定的监控功能。当使用StreamingContext时,Spark web UI有一个额外的Streaming
tab页,显示关于正在运行的receiver的统计数据(receiver是否活跃,接收到记录的数量,receiver错误等)以及完成的批次信息(批次处理时间,队列延迟等)。这些都可以监控streaming应用程序的进程。
web UI中下面两个指标尤为重要:
- Processing Time - 处理每个批次数据的时间。
- Scheduling Delay - 批次在队列中等待前一个批次处理完成的时间
如果批处理时间总是大于批时间间隔和/或排队延迟持续增加,则说明系统不能快速处理这些批次,正在落后。在这种情况下,要考虑减少批处理时间。
Spark Streaming的进程也可以使用StreamingListener接口进行监控,这个接口允许获取receiver的状态以及处理时间。注意这是一个开发者API,未来会改进(报告更多的信息)。