一 Spark中的一致性
1 Spark RDD的基本容错语义
- RDD是不可变的,确定的,可以被重新计算的分布式数据集。每个RDD都记录了已经确定的操作先后关系。
- 如果由于任何一个节点的错误导致RDD的任何一个分区丢失,该分区都可以从原始的数据集中通过操作的先后关系重新计算出来。
- 所有的RDD转换都是明确的,那么在最终转换后的RDD的数据总是相关的,与Spark cluster的错误无关。
可以看到Spark RDD已经为我们提供了比较好的容错能力。Spark进行数据的操作在可容错的文件系统中,如HDFS或者S3。因此从容错数据中生成的所有RDD也是容错的。为了所有的生成的RDD达到相同的容错性,RDD的数据会在集群中工作节点上多个executor中创建副本(默认复制因子是2)。
2 系统异常的场景
1.工作节点错误(Failure of a Worker Node ) 任何执行executor的worker node都可能失败,这些节点上的内存数据会丢失。任何运行在一个失败节点上的receivers,都会丢失缓存数据(buffered data)。
2.Driver节点错误(Failure of the Driver Node) 如果运行Spark Streaming 应用的driver node失败,显然SparkContext丢失,那么所有executor的内存数据都会丢失。
3 Spark Streaming中的容错语义
在理想的情况下,一个系统能够提供的三种类型的一致性
- At most once: 每个record 要么被处理一次或者不会被处理
- At least once: 每个record会被处理一次或者多次。 这个比at-most once一致性要强,因为它保证了没有数据会丢失。但是可能会有很多重复的数据。
- Exactly once: 每个record都会被处理有且一次。没有数据会丢失,并且也没有数据会被处理多次。这个显然是三者中一致性最强的。
4 Exactly once的讨论
如果要做到Exactly once ,就需要在整个数据处理的过程中都做到,包括:
- Receiving the data: 数据的接收过程
- Transforming the data: 数据的处理转换过程。Spark Streaming中使用DStream 和 RDD 的转换。由于RDD的机制,Spark的处理保证了exactly once。
- Pushing out the data: 数据的输出 最终转换的数据输出到外部的系统中,比如文件系统,数据库等。 默认的输出操作保证了at-least once语义, 但是基于输出操作和下游的系统(是否支持事务等)。 但是用户可以实现自己的事务机制,以达到exactly-once 语义。
在数据输出层面,为了达到exactly-once,有两种方式:
- Idempotent updates: 幂等更新 多次写总是写相同的数据。 例如saveAs***Files 总是将相同的数据写入到文件中。这里比较典型的情况是输出基于key-value的数据库,即使在at-most-once的情况下,有些数据可能会出现多次,但是会多次写对于最终的输出正确性也不会有正确性的影响。
- Transactional updates: 所以的更新都是基于事务的,所以所有的更新都是exactly-once.
一种基于事务的达到exactly-once方式如下: - 使用foreachRDD中的batch time和RDD的分区索引(partition index)创建一个标识符。这个标识符唯一标记了streaming 应用中的数据。
- 通过事务更新这部分数据到外部存储中,如果该标识符没有提交,那么原子的提交分区数据和标识符。
二 Spark Streaming 与Kafka集成的一致性讨论
将Kafka作为Spark Streaming的数据输入端,与Kafka集成的旧API通过基于Receiver的机制来从数据源中获得数据,有很多相关的讨论如何来达到可靠性以及高性能。 新的API通过Kafka Direct API保证了从Kafka接收的数据是exactly once。 旧的Kafka API使用起来需要考虑的问题比较多,推荐使用新的Kafka Direct API。下面也主要围绕新的API讨论。
val directKafkaStream = KafkaUtils.createDirectStream
在该版本中,创建出的directKafkaStream的offset只会在该stream内存中存储,并不会存储到zookeeper中。根据上述的讨论,为了实现exactly-once,我们有两种方式:1 在一个事务中,提交数据和offset到存储中。2 在幂等操作的输出后,存储offset。
对于存储offset 有三个选择:
1 Checkpoints
启用kafka的Checkpoints机制。offset会自动的存储到checkpoint中。启用Checkpoints的问题也很明显:如果提交的spark应用的代码发生了变化之后,不能从checkpoint中进行恢复。另外,为了达到exactly-once 需要输出的存储是幂等的。
2 Kafka中
在Kafka broker version 0.10.0的API中,默认会自动的提交offset到kafka中(实际是写入到了zookeeper中kafka相关的路径中)。但是默认的自动提交的时间我们并不知道,这些数据是否写入的存储中并不确定。所以应当enable.auto.commit置为false。 当数据写入到存储中时,我们应当通过commitAsync API来提交offset到kafka中。由于kafka不是事务性的,所以为了exactly-once,输出需要是幂等的。
Kafka broker version 0.8.2.1
在该版本中,并不会提交offset。需要我们自行实现保存offset到zookeeper的功能。 实际上 Kafka broker version 0.10.0所使用的提交offset所用到的类是KafkaCluster,在该版本中存在,只是相关代码没有作为API公开出来。我们可以直接从源码中提取该类的代码直接使用。
3 自定义的存储
能够存储offset和结果数据在同一个事务中。需要我们自己检测跳过还是采用offset以避免使数据重复或者丢失数据。
exactly-once听起来非常美好,但是在很多应用场景下实现成本较高。
在对数据可靠性要求不高的场景下,甚至at-least-once都可以作为一个选项,尤其对于数据实时性要求更高的场景下。而在工程实践中,代码版本的更新迭代是不可避免的,一旦旧版本中出现了bug导致服务不能正常运行,数据将不能恢复,所以并不建议使用checkpoint。
参考
http://spark.apache.org/docs/latest/streaming-programming-guide.html#fault-tolerance-semantics
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html