Kafka-偏移量

偏移量提交带来的再平衡问题

消费者提交偏移量的主要是消费者往一个名为_consumer_offset的特殊主题发送消息,消息中包含每个分区的偏移量。 如果消费者一直运行,偏移量的提交并不会产生任何影响。但是如果有消费者发生崩溃,或者有新的消费者加入消费者群组的时候,会触发 Kafka 的再均衡。这使得 Kafka 完成再均衡之后,每个消费者可能被会分到新分区中。

为了能够继续之前的工作,消费者就需要读取每一个分区的最后一次提交的偏移量,然后从偏移量指定的地方继续处理。

但是这样可能会出现如下的问题。

提交偏移量小于客户端处理的偏移量

如果提交的偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理。

提交偏移量大于客户端处理的偏移量

如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失。 因此,如果处理偏移量,会对客户端处理数据产生影响。

KafkaConsumer API 提供了很多种方式来提交偏移量。


自动提交

自动提交是 Kafka 处理偏移量最简单的方式。

当 enable.auto.commit 属性被设为 true,那么每过 5s,消费者会自动把从 poll()方法接收到的最大偏移量提交上去。

这是因为提交时间间隔由 auto.commit.interval.ms 控制,默认值是 5s。与消费者里的其他东西一样,自动提交也是在轮询里进行的。

消费者每次在进行轮询时会检查是否该提交偏移量了,如果是,那么就会提交从上一次轮询返回的偏移量。

但是使用这种方式,容易出现提交的偏移量小于客户端处理的最后一个消息的偏移量这种情况的问题。

假设我们仍然使用默认的 5s 提交时间间隔,在最近一次提交之后的 3s 发生了再均衡,再均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后了 3s(因为没有达到5s的时限,并没有提交偏移量),所以在这 3s 的数据将会被重复处理。

虽然可以通过修改提交时间间隔来更频繁地提交偏移量,减小可能出现重复消息的时间窗的时间跨度,不过这种情况是无法完全避免的。

在使用自动提交时,每次调用轮询方法都会把上一次调用返回的偏移量提交上去,它并不知道具体哪些消息已经被处理了,所以在再次调用之前最好确保所有当前调用返回的消息都已经处理完毕(在调用 close() 方法之前也会进行自动提交)。一般情况下不会有什么问题,不过在处理异常或提前退出轮询时要格外小心。


手动提交

大部分开发者通过控制偏移量提交时间来消除丢失消息的可能性,并在发生再均衡时减少重复消息的数量。消费者 API 提供了另一种提交偏移量的方式,开发者可以在必要的时候提交当前偏移量,而不是基于时间间隔。 这是我们需要把把 auto.commit.offset 设为 false,让应用程序决定何时提交偏移量。

同步提交

使用 commitSync() 提交偏移量最简单也最可靠。这个 API 会提交由 poll() 方法返回的最新偏移量,提交成功后马上返回,如果提交失败就抛出异常。

代码示例如下:

commitSync() 将会提交由 poll() 返回的最新偏移量,所以在处理完所有记录后要确保调用了 commitSync(),否则还是会有丢失消息的风险。

如果发生了再均衡,从最近一批消息到发生再均衡之间的所有消息都将被重复处理。

同时在这个程序中,只要没有发生不可恢复的错误,commitSync() 方法会一直尝试直至提交成功。如果提交失败,我们也只能把异常记录到错误日志里。

异步提交

同步提交有一个不足之处,在 broker 对提交请求作出回应之前,应用程序会一直阻塞,这样会限制应用程序的吞吐量。我们可以通过降低提交频率来提升吞吐量,但如果发生了再均衡,会增加重复消息的数量。 这个时候可以使用异步提交 API。我们只管发送提交请求,无需等待 broker 的响应。

在成功提交或碰到无法恢复的错误之前,commitSync() 会一直重试,但是 commitAsync() 不会,这也是 commitAsync() 不好的一个地方。 它之所以不进行重试,是因为在它收到服务器响应的时候,可能有一个更大的偏移量已经提交成功。假设我们发出一个请求用于提交偏移量 2000,这个时候发生了短暂的通信问题,服务器收不到请求,自然也不会作出任何响应。与此同时,我们处理了另外一批消息,并成功提交了偏移量 3000。如果 commitAsync() 重新尝试提交偏移量 2000,它有可能在偏移量 3000 之后提交成功。这个时候如果发生再均衡,就会出现重复消息。 commitAsync() 也支持回调,在 broker 作出响应时会执行回调。回调经常被用于记录提交错误或生成度量指标。如果要用它来进行重试,则一定要注意提交的顺序。

同步和异步混合提交

一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。 但如果这是发生在关闭消费者或再均衡前的最后一次提交,就要确保能够提交成功。因此在这种情况下,我们应该考虑使用混合提交的方法:

在程序正常运行过程中,我们使用 commitAsync 方法来进行提交,这样的运行速度更快,而且就算当前提交失败,下次提交成功也可以。 如果直接关闭消费者,就没有所谓的“下一次提交”了,因为不会再调用poll()方法。使用 commitSync() 方法会一直重试,直到提交成功或发生无法恢复的错误。

提交特定的偏移量

如果 poll() 方法返回一大批数据,为了避免因再均衡引起的重复处理整批消息,想要在批次中间提交偏移量该怎么办?这种情况无法通过调用 commitSync() 或 commitAsync() 来实现,因为它们只会提交最后一个偏移量,而此时该批次里的消息还没有处理完。 这时候需要使用一下的两个方法:

消费者 API 允许在调用 commitSync() 和 commitAsync() 方法时传进去希望提交的分区和偏移量的 map。 假设处理了半个批次的消息,最后一个来自主题“customers”分区 3 的消息的偏移量是 5000,你可以调用 commitSync() 方法来提交它。不过,因为消费者可能不只读取一个分区,你需要跟踪所有分区的偏移量,所以在这个层面上控制偏移量的提交会让代码变复杂。 代码如下:

这里调用的是 commitAsync(),不过调用commitSync()也是完全可以的。在提交特定偏移量时,仍然要处理可能发生的错误。


监听再均衡

如果 Kafka 触发了再均衡,我们需要在消费者失去对一个分区的所有权之前提交最后一个已处理记录的偏移量。如果消费者准备了一个缓冲区用于处理偶发的事件,那么在失去分区所有权之前,需要处理在缓冲区累积下来的记录。可能还需要关闭文件句柄、数据库连接等。 在为消费者分配新分区或移除旧分区时,可以通过消费者 API 执行一些应用程序代码,在调用 subscribe() 方法时传进去一个 ConsumerRebalanceListener 实例就可以了。ConsumerRebalanceListener 有两个需要实现的方法。 public void onPartitionsRevoked(Collection partitions) 方法会在再均衡开始之前和消费者停止读取消息之后被调用。如果在这里提交偏移量,下一个接管分区的消费者就知道该从哪里开始读取了。 public void onPartitionsAssigned(Collection partitions) 方法会在重新分配分区之后和消费者开始读取消息之前被调用。 下面的例子将演示如何在失去分区所有权之前通过 onPartitionsRevoked() 方法来提交偏移量。

如果发生再均衡,我们要在即将失去分区所有权时提交偏移量。要注意,提交的是最近处理过的偏移量,而不是批次中还在处理的最后一个偏移量。因为分区有可能在我们还在处理消息的时候被撤回。我们要提交所有分区的偏移量,而不只是那些即将失去所有权的分区的偏移量——因为提交的偏移量是已经处理过的,所以不会有什么问题。调用 commitSync() 方法,确保在再均衡发生之前提交偏移量。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,921评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,635评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,393评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,836评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,833评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,685评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,043评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,694评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,671评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,670评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,779评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,424评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,027评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,984评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,214评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,108评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,517评论 2 343

推荐阅读更多精彩内容