Spark Structured Streaming 2.4 踩的一些坑

最近参与一个公司大数据项目开始入坑Spark,Spark从2.0开始从RDD 的底层API转向了面向Dataset/Dataframe 的高级API,Spark Streaming 也换成了Structured Streaming,而我们用的是2.4,带上了watermark 功能,对流媒体的处理算是圆满了。

如果希望从框架结构方面了解1.x 和2.x 的差异,我比较推荐下面这个腾讯团队写的wiki

CoolplaySpark

另外下面这篇总结对于想了解2.4 特性也是非常不错

《Spark Structured Streaming》 官方文档解读

而从实际的项目来说,实践了一段时间,还是遇到了几个坑,这里逐一分享

dropDuplicates 算子

这里我就不累赘举例这个算子怎么用了,西面直接说说用这个算子的话小心3点

1) 如果你的项目是用了window机制的话,那么去重的语义一般可能就是说在同一个window下去重,这个语义是比较合理的,那么如果你是需要在同一个window下对数据去重的话,参数是提供window 列就可以了,注意有些人是提供 $timestamp 列是不太对的,但是还有一点要注意,调用这个算子并不是每次只操作一个window,而是hold在你内存的所有消息,因此自己去衡量一下性能。我最终没有用这个算子主要是因为下面这个点

2)dropDuplicates目前只能做keepFirst,而我们的业务需要可能需要做的是keepLask,目前我没有发现有办法可以实现keepLask。

3)看它源码可以知道,其实里面就是做了一次的repartition + reduceByKey, 这里最最最要命的就是它需要shuffle,而我们的业务其实在每个kafka 的partition已经做过一次了,这样的话其实每个分区自己来做去重就好了,而Spark 又没有诸如 dropDuplicatesWithinPartition() 的方法,因此该算子满足不了需求,目前我还没找到很完美的方法,配合用Apache Collection API 基本可以最少限度的破坏数据流处理的语义,可以参考下面的代码

                todayDs.mapPartitions(it -> {
                    Set<Long> gidSet = new HashSet<>(3000);
                    return IteratorUtils.filteredIterator(it,
                            item -> {
                                if(gidSet.contains(item.getGlobalId())) {
                                    return false;
                                } else {
                                    gidSet.add(item.getGlobalId());
                                    return true;
                                }
                            });
                },
                ......

在Structured Streaming with window watermark 下如何提交consumed Kafka 的offset

可以说这个是我遇到最头疼的问题,google了非常多的材料,并且也咨询过一些大厂玩 spark streaming的团队,也没有一个比较好的方案,可能也是由于我们业务本身的特点,无法提供到一个幂等的sink() 语义,因此就使得很难找到一个合理的点去submit Kafka的 offset。

从Spark 2.x Structured Streaming 提供了一些API,可以帮助用户实现这种 end-to-end exactly-once fault-tolerance guarantees 的语义,这个可以参考一下SSS的官方文档,上面有些介绍,而我大概总结了一下,可以这样去理解:

  1. check point 机制,我理解这个应该是一个不down机的机制,当需要去替换一些 worker 或者rolling restart 一些worker的时候,Spark应该是可以从内部的一些checkpoint 恢复出当前工作的taskId或者说是batchId,从而可以恢复之前中断的作业,因此这个状态下是不需要考虑作业的中断或者重哪里开始重新消费的问题

  2. 而有一些场景比如我们发版时往往是需要整个集群restart的时候,就需要一个机制来界定上次消费的offset了,这个问题在下面这个 Stack Overflow 是讨论的比较全面的,而结论很不幸是无法达到我想要的目的

How to get Kafka offsets for structured query for manual and reliable offset management?

其中一个解决方案是采用2.x 提供的Listener机制,在processing 的时候自行去commit 一下offset 到外部持久化地方

这里有两个致命的问题

  • 因为启用了watermark, 消费了地方并不是这次sink 的批次,简单来说,假设window 是1分钟,watermark 是1分钟,那么其实我们洗的数据是2分钟之前的数据,如果我们在上面的listener 上提交了最新的offset,其实重启后我们将会丢失大概2分钟的数据

  • 我假设上面的1 理论上是可以解决,那么下一个问题,还是watermark,因为用了watermark,消息就变相变成了“乱序”,这个怎么理解呢,我们借下面这个图看看


我们一般的预期是从管道的左边往右边顺序消费的,但是watermark破坏了这个规则,假设当前window正在清洗蓝色框的数据,那么我们期望应该是有个机制来标记 j,k 的offset,那么下次就算有问题,我们从这里开始消费就可以了,然而假设现在来了一个迟到的数据m,那么其实它是会归并到了蓝色的window下一起来洗数据的,而这时你很可能记录的是m的下标,那么这时如果是down机的话我们就会丢失了 j 到 m 中间的所有数据了

因此这个问题我想到最终要实现的话必须要满足2点

  • 我们要记录window的起始位置而不是末端位置
  • 这个window的 sink 必须是幂等的

如果大家还有什么可以实现的方案麻烦告知

foreachBatch 是单线程的

这里我们重新温习一下 Structured Steaming的整个流程

  • 从source 源源不断地去捞数据进来
  • 在processing 里其实不管你用了什么算子也好,shuffle也好,SQL view 也好,你看到的其实都是整个内存中所有没有expired 的数据
  • 最后sink 阶段拿到的source,其实是满足了 watermark 后expired了洗出来的结果集

那么foreachBatch 其实是把所有的partition的结果集再汇总回去driver,由driver来处理,而我们项目由于sink的数据比较大,并且是非常重的DB操作,因此导致最后这个阶段非常耗时,上到几十秒都有,而这个东西又阻塞了下一次JOB 的启动,因此就造成了我们的JOB的调度频率非常低下


最后就是抛弃了这个方法,仍然采用 foreach 算子,在里面汇总了一批 partition的数据一个批次提交,实测性能提升了好几倍。

最后

  • 剩下要注意就是一些编程原则问题了,比如
  • 尽量减少RDD 之间转换时一些大对象的大量创建
  • 尽量减少破坏流式处理的编程风格
  • 想办法减少串行处理的耗时,尽量并行化

这些在Spark UI 上可视化信息都非常直观,照着来优化就可以了。

最后关于 Spark 2 非常值得推荐的一本书就是 《Spark - The Definitive Guide - Big data processing made simple》 可惜现在只有英文版,看起来非常费劲,希望早日有大虾翻译成中文。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,723评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,485评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,998评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,323评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,355评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,079评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,389评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,019评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,519评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,971评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,100评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,738评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,293评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,289评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,517评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,547评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,834评论 2 345

推荐阅读更多精彩内容