最近参与一个公司大数据项目开始入坑Spark,Spark从2.0开始从RDD 的底层API转向了面向Dataset/Dataframe 的高级API,Spark Streaming 也换成了Structured Streaming,而我们用的是2.4,带上了watermark 功能,对流媒体的处理算是圆满了。
如果希望从框架结构方面了解1.x 和2.x 的差异,我比较推荐下面这个腾讯团队写的wiki
另外下面这篇总结对于想了解2.4 特性也是非常不错
而从实际的项目来说,实践了一段时间,还是遇到了几个坑,这里逐一分享
dropDuplicates 算子
这里我就不累赘举例这个算子怎么用了,西面直接说说用这个算子的话小心3点
1) 如果你的项目是用了window机制的话,那么去重的语义一般可能就是说在同一个window下去重,这个语义是比较合理的,那么如果你是需要在同一个window下对数据去重的话,参数是提供window
列就可以了,注意有些人是提供 $timestamp 列是不太对的,但是还有一点要注意,调用这个算子并不是每次只操作一个window,而是hold在你内存的所有消息,因此自己去衡量一下性能。我最终没有用这个算子主要是因为下面这个点
2)dropDuplicates目前只能做keepFirst,而我们的业务需要可能需要做的是keepLask,目前我没有发现有办法可以实现keepLask。
3)看它源码可以知道,其实里面就是做了一次的repartition + reduceByKey, 这里最最最要命的就是它需要shuffle,而我们的业务其实在每个kafka 的partition已经做过一次了,这样的话其实每个分区自己来做去重就好了,而Spark 又没有诸如 dropDuplicatesWithinPartition() 的方法,因此该算子满足不了需求,目前我还没找到很完美的方法,配合用Apache Collection API 基本可以最少限度的破坏数据流处理的语义,可以参考下面的代码
todayDs.mapPartitions(it -> {
Set<Long> gidSet = new HashSet<>(3000);
return IteratorUtils.filteredIterator(it,
item -> {
if(gidSet.contains(item.getGlobalId())) {
return false;
} else {
gidSet.add(item.getGlobalId());
return true;
}
});
},
......
在Structured Streaming with window watermark 下如何提交consumed Kafka 的offset
可以说这个是我遇到最头疼的问题,google了非常多的材料,并且也咨询过一些大厂玩 spark streaming的团队,也没有一个比较好的方案,可能也是由于我们业务本身的特点,无法提供到一个幂等的sink() 语义,因此就使得很难找到一个合理的点去submit Kafka的 offset。
从Spark 2.x Structured Streaming 提供了一些API,可以帮助用户实现这种 end-to-end exactly-once fault-tolerance guarantees 的语义,这个可以参考一下SSS的官方文档,上面有些介绍,而我大概总结了一下,可以这样去理解:
check point 机制,我理解这个应该是一个不down机的机制,当需要去替换一些 worker 或者rolling restart 一些worker的时候,Spark应该是可以从内部的一些checkpoint 恢复出当前工作的taskId或者说是batchId,从而可以恢复之前中断的作业,因此这个状态下是不需要考虑作业的中断或者重哪里开始重新消费的问题
而有一些场景比如我们发版时往往是需要整个集群restart的时候,就需要一个机制来界定上次消费的offset了,这个问题在下面这个 Stack Overflow 是讨论的比较全面的,而结论很不幸是无法达到我想要的目的
How to get Kafka offsets for structured query for manual and reliable offset management?
其中一个解决方案是采用2.x 提供的Listener机制,在processing 的时候自行去commit 一下offset 到外部持久化地方
这里有两个致命的问题
因为启用了watermark, 消费了地方并不是这次sink 的批次,简单来说,假设window 是1分钟,watermark 是1分钟,那么其实我们洗的数据是2分钟之前的数据,如果我们在上面的listener 上提交了最新的offset,其实重启后我们将会丢失大概2分钟的数据
-
我假设上面的1 理论上是可以解决,那么下一个问题,还是watermark,因为用了watermark,消息就变相变成了“乱序”,这个怎么理解呢,我们借下面这个图看看
我们一般的预期是从管道的左边往右边顺序消费的,但是watermark破坏了这个规则,假设当前window正在清洗蓝色框的数据,那么我们期望应该是有个机制来标记 j,k 的offset,那么下次就算有问题,我们从这里开始消费就可以了,然而假设现在来了一个迟到的数据m,那么其实它是会归并到了蓝色的window下一起来洗数据的,而这时你很可能记录的是m的下标,那么这时如果是down机的话我们就会丢失了 j 到 m 中间的所有数据了
因此这个问题我想到最终要实现的话必须要满足2点
- 我们要记录window的起始位置而不是末端位置
- 这个window的 sink 必须是幂等的
如果大家还有什么可以实现的方案麻烦告知
foreachBatch 是单线程的
这里我们重新温习一下 Structured Steaming的整个流程
- 从source 源源不断地去捞数据进来
- 在processing 里其实不管你用了什么算子也好,shuffle也好,SQL view 也好,你看到的其实都是整个内存中所有没有expired 的数据
- 最后sink 阶段拿到的source,其实是满足了 watermark 后expired了洗出来的结果集
那么foreachBatch 其实是把所有的partition的结果集再汇总回去driver,由driver来处理,而我们项目由于sink的数据比较大,并且是非常重的DB操作,因此导致最后这个阶段非常耗时,上到几十秒都有,而这个东西又阻塞了下一次JOB 的启动,因此就造成了我们的JOB的调度频率非常低下
最后就是抛弃了这个方法,仍然采用 foreach 算子,在里面汇总了一批 partition的数据一个批次提交,实测性能提升了好几倍。
最后
- 剩下要注意就是一些编程原则问题了,比如
- 尽量减少RDD 之间转换时一些大对象的大量创建
- 尽量减少破坏流式处理的编程风格
- 想办法减少串行处理的耗时,尽量并行化
这些在Spark UI 上可视化信息都非常直观,照着来优化就可以了。