高性能kafka消费

在进行spring-kafka消费的过程中,大部分人可能都遇到过kafka消息堆积的情况,尤其是大数据处理的场景,这时候就要想办法提高消费能力。

提高消费者的数量可以吗?我们知道,kafka的消费者个数是与kafka的分区数相关的,一个分区最多只能被一个消费者消费,也就是说,你在客户端开启的消费者个数即使超过了分区数,也不能提高消费能力,反而还占资源!

那既然一个分区只能被一个消费者消费,那我增加分区数不就可以多开消费者了?这在理论上确实是可行的,但是在运维层面会涉及到分区、数据的迁移,而且这期间kafka是不可用的状态,成本太过高昂。

本方案从消费端的代码层面着手,看如何设计实现一个高性能的kafka消费组件。


一、异步

默认情况下,spring-kafka采用的是同步消费模式,这种情况下消费能力被分区数限制,难以提升。所以我们首先想到的异步消费:

异步消费,将消息的拉取与处理交由不同的线程处理,在硬件允许的前提下,可自由提高消费能力。

异步消费虽然提高了消费能力,但是对于offset的提交却带来了挑战:因为默认情况下,offset是自动提交的,自动提交很可能导致异步未处理完的消息丢失;


二、手动提交

方案:

  1. consumer将offset加入本地列表进行维护;
  2. 异步worker回调更新offset状态;
  3. consumer遍历offset,寻找可提交的最大offset;
  4. 提交最大offset到kafka,并清空本地已提交offset;


三、有序offset列表

从上图可知,为了进行手动提交,其中最关键的部分是手动维护的offset列表,为实现offset的提交,要达到的目标:

  • 减少提交次数,每次只提交已完成的最大offset;
  • offset列表已提交部分及时释放,保持弹性;
  • 尽量保证offset更新的有序性,提交时前面offset已完成,避免出现头部真空现象;
101~109处理完之后,只有3次offset提交动作,109提交之后,109及之前的offset都会从offset列表中清除。

但现实情况是:

  • 异步worker执行进度无法预料,对offset回调更新无法保证顺序;
  • 极有可能出现头部长时间无更新的真空现象;
  • 极端情况下,由于头部offset一直不更新导致offset列表一直得不到释放从而导致内存溢出;
103、105、106虽然都处理完了,但是都还不能提交,因为这时候101和102都还未被处理。

极端情况下会出现以下情形:

随着时间的推移,低位的offset始终未被处理,这会导致后面所有已处理的offset都得不到提交,同时由于高位offset不断写入,低位的offset得不到清除,offset列表将急剧膨胀。



问题:在异步执行的情况下怎样实现offset列表更新的有序性?


四、传统线程池

传统线程池调度策略:

  1. 当线程数< corePoolSize时,直接创建新线程执行任务;
  2. 当corePoolSize<线程数<maxPoolSize时,如果队列未满,任务入队等待;
  3. 当corePoolSize<线程数<maxPoolSize时,如果队列已满,创建新线程执行任务;
  4. 当线程数=maxPoolSize,且队列已满,则执行拒绝策略;

从传统线程池的调度策略来看,它是惰性的,这对我们当前场景来说有哪些问题?

  • 在第3步中,后到的任务可能会比先到的任务先执行完;
  • 在第4步中,拒绝策略会导致任务抛弃或由主线程插队;
    这两点都会导致无法保证任务的顺序执行!


五、饥渴线程池

目标:

  • 确保先到的任务先执行;
  • 保证任务等待时的公平性;
  • 不能丢弃任务;

方案:

  1. 改变传统线程池调度策略;
  2. 使用公平队列保证任务等待的公平性;
  3. 完全摒除拒绝策略;
  4. put instead offer;
对传统线程池关键部分进行改写

新的调度策略:

  1. 当线程数< corePoolSize时,直接创建新线程执行任务;
  2. 当corePoolSize<线程数<maxPoolSize时,强制创建新线程执行任务;
  3. 当线程数=maxPoolSize,入队列等待;


六、微批处理

目标:

  • 提高末端业务执行效率,最大化提升性能;
  • 降低本地offset列表内存开销;
  • 提高offset提交至kafka的效率;

方案:

  1. 基于micro-batch微批思想,consumer线程对批量消息进行切分、整合,将微批数据交给不同的worker线程处理;
  2. offset列表仅记录micro-batch中最大的offset;
  3. 仅提交最大offset到kafka;
从kafka批量拉取200条消息,在内部进行分片、切分、合并,以50一组形成微批,提交给4个worker处理,同时,本地offset列表只记录每个微批中最大的offset:50、100、150、200,这样在提高末端业务执行效率、offset提交效率的同时,也极大地节省了内存开销。



微批处理要求业务必须具有原子性,micro-batch这一批要么全部成功,要么全部失败!


七、last offset处理

问题:

  • consumer线程在提交offset时,秉着‘有多少就提交多少’的思想,并不等待,所以在异步的情况下会出现最后一批消息的若干offset得不到提交的情况;
  • 如果后续没有业务消息产生,那么这若干offset将永远得不到提交,如果此时发生rebalance,将发生重复消费;

方案:
基于sideCar模式,提供额外的监视器,动态监测本节点的offset列表,如若发现offset列表仍有未提交的offset,则会主动发送探针消息,驱动consumer进行poll及commit;


八、重写与重排

问题:

  • 发生rebalance时,consumer实例可能会发生变化,用已过时的consumer提交offset会失败;
  • 发生rebalance时,可能会发生重复消费,本地offset列表会重复添加;
  • 发生rebalance时,由于micro-batch机制,拉取的offset可能比已有的所有offset要小,造成offset列表乱序;

方案:

  1. 加入offset列表时,检查consumer实例,如果发生变化,则对consumer进行重写;
  2. 滤重检查及offset列表重排序;


九、分区重分配

问题:

  • 发生rebalance时,节点被分配的分区也可能发生变化,这时节点之前保存的未提交offset列表就成了脏数据,并常驻内存;
  • 脏数据导致探针消息不停地被发送,但是本节点消费不到,引起网络滥发;

问题思考(怎么判断offset列表为脏数据?):

  • 判断offset列表大小不变为什么不可以?
    ——可能存在每次巡检时大小都不变的情况;
  • 判断offset列表内容不变为什么不可以?
    ——可能线程池繁忙,新offset添加一直等待导致列表不变化;

方案:
每次添加offset时记录offset列表的更新时间戳,Offset Monitor定时检测,当经历了一个rebalance周期后,如果时间戳仍未更新,判断offset列表为脏数据,予以清除。

方案思考

  • 时间戳长时间不更新意味着什么?
    —— 1.分区重分配;2.线程池繁忙,新offset添加不进去;
  • 线程池繁忙的场景,为什么offset列表不会被误杀?
    —— 把时间线拉长,新offset加入等待一个rebalance周期后,一定会rebalance;
关键代码



这个案例给我们的启发:从空间维度转变为时间维度去判断,这种思考维度的转变以及思考尺度的放大,降低了解决问题的复杂性,提供了更多的可能性。


十、极致性能

问题:

  • 传统队列在读写时存在锁争用,在高并发场景下,线程不停地被挂起、恢复,上下文切换过程存在着很大的开销;
  • 在x86架构下的CPU,在高并发场景下很容易形成伪共享(多个线程操作不同的变量,但是变量处于相同的缓存行,修改变量会使缓存行失效,甚至发生跨槽读取);
传统队列关键代码

Disruptor:
Disruptor是一个高性能的队列,通过以下设计解决传统队列的锁争用及伪共享问题:

  • 无锁设计:采用CAS无锁方式,保证线程安全,并提高效率;
  • 环形数组:可避免频繁的垃圾回收,同时数组对处理器的缓存机制更加友好;
  • 元素定位:环形数组长度为2^n,通过位运算,能快速取到元素;
  • Cache padding:通过添加额外的无用信息,避免伪共享引发的性能问题。
cas与cache padding



关键代码


十一、总体架构


十二、效果测试

服务器:3 * 6核8G
消费逻辑:将14万CDC消息计算更新后写入ES
消费配置:kafka 6分区、批量拉取500、32线程、2048队列
效果对比:

  • 不使用组件:81分钟
  • 亲缘线程池:6分41秒
  • Disruptor:4分51秒
  • 饥渴线程池:4分11秒
  • 饥渴 + 微批:1分35秒
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,921评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,635评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,393评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,836评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,833评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,685评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,043评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,694评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,671评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,670评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,779评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,424评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,027评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,984评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,214评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,108评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,517评论 2 343

推荐阅读更多精彩内容