1.1 Receiver-based Approach
这种方式利用接收器(Receiver)来接收kafka中的数据,其最基本是使用Kafka高阶用户API接口。对于所有的接收器,从kafka接收来的数据会存储在spark的executor中,之后spark streaming提交的job会处理这些数据。
Receiver-based的Kafka读取方式是基于Kafka高阶(high-level) api来实现对Kafka数据的消费。在提交Spark Streaming任务后,Spark集群会划出指定的Receivers来专门、持续不断、异步读取Kafka的数据,读取时间间隔以及每次读取offsets范围可以由参数来配置。读取的数据保存在Receiver中,具体StorageLevel方式由用户指定,诸如MEMORY_ONLY等。当driver 触发batch任务的时候,Receivers中的数据会转移到剩余的Executors中去执行。在执行完之后,Receivers会相应更新ZooKeeper的offsets。如要确保at least once的读取方式,可以设置spark.streaming.receiver.writeAheadLog.enable为true。具体Receiver执行流程见下图:
import org.apache.spark.streaming.kafka._
val kafkaStream = KafkaUtils.createStream(streamingContext,
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
还有几个需要注意的点:
- 在Receiver的方式中,Spark中的partition和kafka中的partition并不是相关的,所以如果我们加大每个topic的partition数量,仅仅是增加线程来处理由单一Receiver消费的主题。但是这并没有增加Spark在处理数据上的并行度。
- 对于不同的Group和topic我们可以使用多个Receiver创建不同的Dstream来并行接收数据,之后可以利用union来统一成一个Dstream。
- 如果我们启用了Write Ahead Logs复制到文件系统如HDFS,那么storage level需要设置成StorageLevel.MEMORY_AND_DISK_SER,也就
是KafkaUtils.createStream(...,StorageLevel.MEMORY_AND_DISK_SER)
4.1.1 源码分析
需要查看
currentBuffer
在哪里被处理,搜索代码,发现而
updateCurrentBuffer
方法又在这里被回调period
就是上面传入的,blockIntervalMs
,默认200ms切一个block,这也是一个优化参数这里参考博客,暂时未找到从哪里找到的
BlockRDD
类中getPartitions
方法是说将这个batch的blocks作为partitions。Compute
方法则按照入参BlockRDDPartition
的blockId
,从blockManager
中获取该block作为partition的数据。getPreferredLocations
则是将BlockRDDPartition
所在的host作为partition的首选位置,移动计算,不移动数据原则。
1.2 Direct Approach (No Receivers)
Direct方式采用Kafka简单的consumer api方式来读取数据,无需经由ZooKeeper,此种方式不再需要专门Receiver来持续不断读取数据。当batch任务触发时,由Executor读取数据,并参与到其他Executor的数据计算过程中去。driver来决定读取多少offsets,并将offsets交由checkpoints来维护。将触发下次batch任务,再由Executor读取Kafka数据并计算。从此过程我们可以发现Direct方式无需Receiver读取数据,而是需要计算时再读取数据,所以Direct方式的数据消费对内存的要求不高,只需要考虑批量计算所需要的内存即可;另外batch任务堆积时,也不会影响数据堆积。其具体读取方式如下图:
这种方法相较于Receiver方式的优势在于:
- 简化的并行:在Receiver的方式中我们提到创建多个Receiver之后利用union来合并成一个Dstream的方式提高数据传输并行度。而在Direct方式中,Kafka中的partition与RDD中的partition是一一对应的并行读取Kafka数据,这种映射关系也更利于理解和优化。
- 高效:在Receiver的方式中,为了达到0数据丢失需要将数据存入Write Ahead Log中,这样在Kafka和日志中就保存了两份数据,浪费!而第二种方式不存在这个问题,只要我们Kafka的数据保留时间足够长,我们都能够从Kafka进行数据恢复。
- 精确一次:在Receiver的方式中,使用的是Kafka的高阶API接口从Zookeeper中获取offset值,这也是传统的从Kafka中读取数据的方式,但由于Spark Streaming消费的数据和Zookeeper中记录的offset不同步,这种方式偶尔会造成数据重复消费。而第二种方式,直接使用了简单的低阶Kafka API,Offsets则利用Spark Streaming的checkpoints进行记录,消除了这种不一致性。
4.2.1 源码分析
val stream = KafkaUtils.createDirectStream()
这里两个优化参数:
val refreshLeaderBackoffMs = props.getInt("refresh.leader.backoff.ms", RefreshMetadataBackoffMs)
val maxRetries = context.sparkContext.getConf.getInt("spark.streaming.kafka.maxRetries", 1)
优化参数 "spark.streaming.kafka.maxRatePerPartition"
在getPartitions方法中可以看到,KafkaRDD的partition个数就是topic的partition个数之和。
compute方法是RDD用来构建一个partition的数据的。