深入Kafka系列(二) consumer开发

本篇介绍Kafka consumer开发相关的内容。

1. consumer基本概念

1.1 消费者

Kafka消费者就是各种读取Kafka集群消息的应用程序。值得注意的是consumer的版本和分类。旧版本的consumer是由Scala开发的,在Kafka 0.9.0.0版本后推出Java开发的新版本consumer。新旧版本的consumer在设计上、API上有很多不同,所以要注意区分。本文以新版本的consumer做介绍。

此外在具体的应用场景中,consumer还可以分类成:

  • 消费者组(consumer group),多个消费者实例组成一个整体消费消息

  • 独立消费者(standalone consumer),单独执行消费操作。

1.2 消费者组

consumer group的定义:

消费者使用group.id来标记自己,topic的每条消息都只会被发送到每个订阅它的消费者组的一个消费者实例上。

可以看出三点:

  • 一个consumer group有若干个consumer实例,consumer实例可以是一个线程,或者运行在其他机器的进程。

  • 同一group内,topic某个分区的消息只能发送给group下的一个consumer实例

  • topic可以发送给多个group

通过consumer group就可以实现两种消息引擎模型:

  • 基于队列模型:所有consumer实例属于相同group

  • 基于发布/订阅模型:consumer实例属于不同group

consumer group用于实现高伸缩性,高容错性的consumer机制。在consumer group内,若某一个consumer实例“挂”掉,可以将该consumer负责的分区交给其他consumer负责,即所谓的rebalance,保障整个group可以继续工作。

1.3 offset提交

每个consumer实例都会为它消费的分区维护属于自己的位置信息来记录当前消费了多少条信息。Kafka中,consumer group保存offset信息在Kafka的一个内部topic,即__consumer_offsets中。

__consumer_offsets是Kafka内部topic,专门保存offset。

一般情况下,__consumer_offsets有50个分区。保存的每条消息格式为KV:<group.id+topic+分区号,offset>。

当consumer group提交offset时,对group.id做哈希求模,分散在50个分区中。每次更新同一个key的最新offset值时,该topic就会写入一条含有最新offset的信息,同时Kafka会定时对__consumer_offsets做compact操作,控制日志容量。

2. 构建consumer

2.1 代码实例

0.jpeg

2.2 详细步骤

构造consumer应用程序的基本步骤为:

  1. 构造Properties对象,配置consumer参数。必须指定的参数有bootstrap.servers,group.id,key.deserializer,value.deserializer。

  2. 构造KafkaConsumer对象

  3. 订阅topic

  4. 循环调用KafkaConsumer.poll方法获取ConsumerRecord的消息

  5. 根据业务逻辑处理ConsumerRecord对象

  6. 关闭KafkaConsumer

2.3 consumer其他参数

  • session.timeout.ms:consumer group检测组内成员是否崩溃的时间间隔,默认10s

  • max.poll.interval.ms:consumer处理逻辑最大时间

  • auto.offset.rest:指定了无位移信息或位移越界时kafka的应对策略。分别是earliest、latest

其他参数可以参阅官网。

3. 消息轮询

Kafka在读取消息时,是要能够同时读取多个topic的多个分区消息。Kafka采用的是一个线程同时管理多个Socket连接,即同时与多个broker通信实现消息的并行读取。

Kafka通过重复性调用poll方法获取消息,每次poll方法返回的都是订阅分区上的一组消息。

4. 位移管理

4.1 consumer位移

常见的3种消息交付语义保证:

  • 最多一次(at most once),消息可能丢失,不会重复。消息在消费之前提交位移就可以实现。若consumer在提交位移和消费消息之间崩溃,重启后从最新的offset开始消费,这样那条消息就丢失了。

  • 最少一次(at least once),消息不会丢失,可能重复。消息在消费之后提交可以实现。但是Kafka无法保证提交位移和消费消息在一个事务中完成,所以可能重复消费。Kafka默认就是提供at least once的处理语义。

  • 精准一次(exactly once),消息一定被处理且只处理一次。在0.11.0.0版本中正式支持事务以及精准处理的需求。

4.2 consumer位移管理

consumer会在Kafka集群所有broker中选择一个作为consumer group的coordinator,用于实现组员管理、消费分配方案制定、位移提交等。

consumer提交位移的机制是:通过向所属的coordinator发送位移提交请求实现,每个位移提交请求都会写进__consumer_offsets对应分区上追加一个新消息。消息的key是<group.id,topic,分区号>,value是offset。

Kafka可以选择手动提交和自动提交offset。手动提交offset可以实现用户自行确定消息何时被真正处理完并提交位移。在较强的精准一次处理语义时,需要用户自行实现手动位移提交。

手动提交API又分为同步手动提交(commitSync)和异步手动提交(commitAsync)。调用commitSync,程序会等待位移提交结束后才执行下一条命令;若调用commitAsync,则是一个异步非阻塞调用,程序会在poll方法中不断轮询这次异步提交的结果。

5. 重平衡(Rebalance)

5.1 rebalance概述

rebalance规定了consumer group如何达成一致分配订阅topic的所有分区。新版本中Kafka通过coordinator完成rebalance。

rebalance触发的三个条件有:

  • 组成员发生更变,比如有consumer的加入、离开、崩溃

  • 组订阅topic数有变更

  • 组订阅topic的分区数发生变更

默认提供了三种策略:range策略、round-robin策略和sticky策略。

5.2 rebalance流程

consumer group在执行rebalance前必须确定coordinator所在broker,然后执行rebalance过程:

  1. 加入组。组内所有consumer向coordinator发送JoinGroup请求,coordinator从中选择一个consumer当leader,并把所有成员信息和订阅信息发送给leader。

  2. 同步更新分配方案。leader指定分配方案,根据分配策略决定consumer负责那些topic的哪些分区,分配完成后会把分配方案封装进SyncGroup请求发送给coordinator。coordinator再把分配方案中属于每个consumer的方案单独抽取出来response返回各自的consumer。

6. 多线程消费

一般情况有两种解决方法:第一是每一个线程维护一个KafkaConsumer;第二是单个KafkaConsumer实例+多Worker线程。具体实现方法还请参考原文,或者百度相关代码。

更多内容还请参考《Apache Kafka实战》第五章。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 195,980评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,422评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,130评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,553评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,408评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,326评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,720评论 3 386
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,373评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,678评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,722评论 2 312
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,486评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,335评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,738评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,009评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,283评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,692评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,893评论 2 335

推荐阅读更多精彩内容