Spark-streaming-2.0-Kafka数据接收并行度源码学习

前段时间学习了spark streaming采用kafka作为数据源时,数据接收并行度这一部分的源代码。本文主要将学习的体会记录一下,有理解不对的地方请多多指教。

Streaming从kafka接收数据有Receiver和direct两种方式。下面我们看一下这两种方式的源码。

Direct approach

这种方式是使用kafka的低阶API从kafka消费数据。一般如果需要自行维护partition的offset,实现自定义checkpoint文件,或者exactlyOnce场景下就会用到这一方式。

首先需要看一下DirectKafkaInputDStream这个类,他是我们调用KafkaUtil.

createDirectStream方法生成的用来从kafka端接收数据的。

compute方法定义了InputDStream是如何根据指定的batchTime生成RDD的。

latestLeaderOffsets方法是获取当前InputDStream所包含的topic下所有的partition的最新offset的。Clamp方法是根据spark.streaming.kafka.maxRatePerPartition和backpressure这两个参数来设置当前block可以消费到的offset的(即untilOffset)。这个数值需要跟partition最新的offset取最小值。

maxMessagesPerPartition方法实现了获取某个partition能消费到的message的数量。该方法首先会计算一个每分区每秒钟消费的消息数上线effectiveRateLimitPerPartition,他的value如下图红框中,是在spark.streaming.kafka.maxRatePerPartition和batckpressure中取一个最小值,如果只配置了一个则以配置的为准,都没配置则返回None,返回None时直接取leader最新的offset。然后再根据batchTime计算出某partition在batchTime内能消费的消息数上限。

其中backpressure是spark1.5版本之后增加的参数,能够根据上一个batch的执行效率,动态估算出当前batch能处理的最大消息数。这个参数在每个batch计算完成后,会通过StreamingListenerBus监听StreamingListenerBatchCompleted事件,然后由org.apache.spark.streaming.scheduler.

onBatchCompleted方法来重新计算,如下:

Backpressure的具体实现思路先不展开了(计算公式在PIDRateEstimator.compute方法中)。我们回到DirectKafkaInputDStream.compute方法。当计算完每个partition的untilOffset之后,会根据当前InputDStream所消费的topic的每个partition的currentOffset和untilOffset构建KafkaRDD。

在kafkaRDD中我们可以看到他重写的一些RDD的方法,

在getPartitions方法中可以看到,KafkaRDD的partition个数就是topic的partition个数之和。

在getPreferredLocations方法中可以看到,partition的首选location就是该topic的某个partition的leader所在的host。这是很合理的,因为leader上的数据正常情况下是最新的而且是最准确的。而follower的数据往往还需要从leader上做同步,并且一旦同步出现较大的落后,还会从in-sync列表中移除。而且kafka的读写都是通过leader进行的。

关于方法中part.host可以一路反推回去,会跟踪到KafkaCluster.getLeaderOffsets方法中调用的findLeaders方法,即part.host就是leader的host。

compute方法是RDD用来构建一个partition的数据的。

我们看一下用来从partition中获取数据的KafkaRDDIterator类。在类体中会发现

val consumer = connectLeader

的代码,这说明一点,spark streaming的kafka低阶API是每一个partition起一个consumer来消费数据的。

然后我们看一下fetchBatch方法。该方法中是我们很熟悉的一段根据起止offset消费kafka某topic某partition数据的代码。

通过kafkaRDD这个类的阅读我们可以看出,接收数据是以partition的leader为维度做分布式的,这样做可以保证这个host上是有我要消费的数据的,能够实现数据本地化。

Receiver

这种方式是采用kafka的高阶API来消费数据的。

建立InputDStream的代码如下:

从KafkaUtils.createStream开始跟到KafkaInputDStream类,

getReceiver()方法中的变量useReliableReceiver是判断是否配置了WAL机制。如下:

我们看一下KafkaReceiver的实现代码:

在他的onStart()方法中可以看到他是创建了一个线程池executorPool来消费消息的。而这个线程池的线程数,就是我们在KafkaUtils.createStream时的入参onlineStaffTopicMap的values的和。也就是说入参onlineStaffTopicMap的value指的是某个topic在这个InputDStream中会有多少个consumer去消费数据。

再看一下MessageHandler中消费及保存数据的逻辑:

这段代码中streamIterator是被我们所喜闻乐见的使用高阶API从kafka消费数据的代码。在代码中消费完数据之后,调用了store方法将message进行了保存。

Store方法最终会将这条消息addData到BlockGenerator类中的currentBuffer:

ArrayBuffer中。

该类中的updateCurrentBuffer方法值得我们关注一下,他是用来将已经收集到的消息封装成一个Block的。

那么这个方法什么情况下会被调用呢,需要看一下blockIntervalTimer的实现类RecurringTimer。

RecurringTimer是一个定时重复执行高阶函数callback的执行器,他是通过Thread反复执行loop方法实现的,loop方法中只要定时器不被终止,就会反复调用triggerActionForNextInterval方法,而triggerActionForNextInterval会在特定的时刻(即nextTime)执行callback函数(即入参updateCurrentBuffer函数)。执行完成之后会在nextTime上增加period作为下一次执行的时刻。

而period方法是什么呢,他就是我们在构建blockIntervalTimer时的入参blockIntervalMs,也就是streaming性能的一个优化点spark.streaming.blockInterval。也就是说,这段代码的逻辑是每间隔blockInterval将由consumer消费到的数据切分成一个block。由此我们可以看到,这个参数是用来将Batch中所接受到的数据以它为时间间隔切分为block,而在streaming处理数据时,会将block作为一个partition来进行分布式计算,也就是说我们在指定的batchTime中,根据blockInterval能切出多少个block,就能分成多少个partition,从而决定了streaming处理时的分布式程度。这一段代码如下:

具体为什么我们说一个block会作为一个partition来进行计算,这一点可以看一下ReceiverInputDStream类的compute方法,该方法调用了createBlockRDD方法来创建基于Receiver模式的RDD。在该方法中可以看到最终封装的RDD为BlockRDD或者WriteAheadLogBackedBlockRDD。

BlockRDD类中getPartitions方法是说将这个batch的blocks作为partitions。Compute方法则按照入参BlockRDDPartition的blockId,从blockManager中获取该block作为partition的数据。getPreferredLocations则是将BlockRDDPartition所在的host作为partition的首选位置。

总结

通过阅读源码我们可以看出,direct的方式是从kafka消费完数据之后直接封装成partition的数据提供给作业使用,而receiver是将消费到数据按照blockInterval切分成block,保存到blockManager中,在使用时会根据blockId获取该数据。

另外direct的方式rdd的partition与topic的partition是一一对应的,如果某个topic只有一个partition就不好了。而receiver的partition是根据blockInterval切分出来的,blockInterval的默认值是200ms,不存在这个问题。

这两种方式在生产环境上用的都比较多,我们一开始采用的是receiver的方式。后来为了实现自定义checkpoint,改为了direct的方式。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容