Kafka可靠性机制

一、组件

介绍一下kafka进行数据复制时会涉及到的一些组件概念

  1. zookeeper:维护集群信息,当broker加入或退出时,kafka通过订阅zookeeper就能获得通知

  2. broker:一个独立的kafka服务器称为一个broker。broker接收来自生产者的消息,为消息设置位移,并将消息刷入到磁盘里。broker并且提供消费者服务,对读取的分区数据提供响应。

  3. 控制器/Controller:除了有一般broker的功能外,还会负责分区首领的选举,使用epoch来控制“脑裂”。

    集群里第一个启动的broker通过在Zookeeper里创建一个临时节点/controller使自己成为控制器,其他的broker节点在启动时也会尝试创建这个节点,但会提示失败,因为已经存在了,其它broker节点会在Zookeeper创建/watcher节点去感知控制器的状态,当控制器被关闭或者离开集群了,他们会再次尝试创建/controller节点重复同样的操作。

    新选举出来的控制器,会得到一个递增的controller epoch,其它broker在得知当前的controller epoch后,会忽略旧控制器发出的消息,避免了脑裂的现象。

    控制器可以进行broker分区选举。当分区首领所在的broker离开集群时,控制器遍历这些分区,并确定哪个副本会成为新的分区首领,然后向所有broker发送请求,该请求包含谁是新leader谁是follower,随后新首领开始处理来自生产者和消费者的请求,而follower开始从leader处复制消息

  4. 分区:kafka使用主题来组织数据,每个主题被划分为若干个分区,每个分区可以有若干个副本,分区分配遵循同一分区副本均匀分布在不同broker上。

    例如有4个broker,创建一个包含10个分区的主题,复制因子设置为2,那么总共有20个副本,可以按照如下方式分配给4个broker:

    1、若未指定机架信息,随机指定一个broker0,首领分区0分配给broker0,首领分区1分配给broker1,以此类推......随后从分区首领后开始,依次分配跟随者副本,例如分区0的首领在broker0,那么它的第一个跟随者副本会分配给broker1......

    2、若指定了机架信息,例如broker0和broker1在机架1,broker2和broker3分别在不同的机架,那么分区副本需要按照broker0,broker2,broker1,broker3进行交替分配

  5. 副本:分为首领(leader)副本和跟随者(follower)副本。

    • leader副本处理所有的写入和访问请求,另外会通过与follower保持状态的交互,维护一个isr列表;

      broker在处理请求时,如果收到一个包含特定分区的生产和读取请求,但是该分区的leader副本并不在该broker上,会导致报错。

      客户端会采用元数据请求方式,服务器会给出对应的响应,响应的消息会指明特定的主题,主题的分区、分区的副本以及副本leader信息,然后客户端会缓存起来便于下次直接访问。并会时不时更新元数据信息

    • follower的任务是复制leader的消息,保持与leader的一致性

    • ISR机制:每个分区都有一个ISR列表,用于维护所有的同步副本。leader副本必须是同步的,follower副本要满足两个条件才算是同步副本:

      1. 定时向zk发送心跳消息,保持与zk的活跃会话
      2. 持续向leader副本请求消息,在允许的消息量/时间延迟范围内保持与leader副本的消息同步(副本LEO落后于leader LEO的时长不大于replica.lag.time.max.ms参数值)
  6. LEO:日志末端位移,记录每个副本中下一条消息的偏移量

  7. HW:水位值,记录当前topic已提交的偏移量。也即消费者能消费到的最大偏移量

  8. Leader Epoch

二、消息的可靠性传递

  1. broker有3个配置可影响消息存储可靠性

    1. 复制系数:主题级别的配置参数是replication.factor,broker级别可以通过default.replication.factor来配置自动创建的主题;更高的复制系数可以带来更高的可用性、可靠性,但是也需要消耗更多的存储空间

    2. 不完全的首领选举:unclean.leader.election只能在broker级别配置,默认值是enable。

      当分区首领不可用时,一个同步副本会被选为新首领,如果在选举过程中没有出现数据丢失,那么这个选举就是完全的。如果允许不同步的副本成为分区首领,那么需要承担丢失数据和数据不一致的风险,如果不允许,那么就要接受较低的可靠性

    3. 最小同步副本:主题和broker级别上都可以配置参数min.insync.replicas参数,如果当前同步副本的个数小于这个参数时,那么生产者将不能往主题分区写入数据,分区也变成了只读状态。

  2. 生产者配置

    1. 发送确认配置:acks可配置3中不同的确认模式
      • acks=0:生产者能够把消息发送出去,则认为消息已成功写入kafka,这种配置可以得到最大的吞吐量带宽利用率,但是却最不稳定最有可能丢失数据
      • acks=1:分区首领在收到数据后写入分区数据文件时会返回确认或者失败的消息,如果生产者能正确处理错误消息,会重试尝试发送消息,最终消息会成功写入到分区首领。这种配置方式也有造成丢失数据的风险,当消息写入分区leader但是在follower复制时leader崩溃了
      • acks=all:生产者在消息写入分区首领和所有的副本后才确认消息被写入,这个参数会配合最小同步副本来使用,在确认最小写入副本数成功后就能返回继续处理下一条消息的继续写入。这种配置可靠性最高,但是吞吐率最低
    2. 配置重试次数:对于可重试解决错误的事件,生产者可以尝试重新发送消息;对于不可重试解决错误的事件,多次重试已失去意义,可以直接丢弃或保存到磁盘再后续处理。重试次数的配置主要看重试的目的是什么。
    3. 额外的错误处理:对于重试机制不能解决的错误,例如消息序列化失败,生产者重试次数达到上限,需要开发人员自行捕获异常并处理。
  3. 消费者可靠性配置

    1. 自动提交偏移量

      • enable.auto.commit(消费者再均衡后会有消息重复消费的情况)
      • auto.commit.interval.ms(自动提交开启,默认提交间隔是5s)
    2. 手动提交偏移量

      enable.auto.commit参数设置为false,手动提交偏移量分两类

      • 手动提交当前轮训的最大偏移量
      • 手动提交固定偏移量

      api分同步提交和异步提交两类

      • 同步提交:提交失败消息后阻塞,消费者进行自动重试,保证消息能够最大限度地提交成功,但会降低吞吐量

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record);
            }
            /*同步提交*/
            consumer.commitSync();
        }
        
      • 异步提交:提交失败后不能自动重试,但是可以通过一个Map<TopicPartition, Integer> offsets对象来维护每个分区提交的偏移量,如果失败的偏移量小于最后一次已提交的偏移量,则不需要重试

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record);
            }
            /*异步提交并定义回调*/
            consumer.commitAsync(new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                  if (exception != null) {
                     offsets.forEach((x, y) -> System.out.printf("topic = %s,partition = %d, offset = %s \n",
                                                                    x.topic(), x.partition(), y.offset()));
                    }
                }
            });
        }
        
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 196,264评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,549评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,389评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,616评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,461评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,351评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,776评论 3 387
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,414评论 0 255
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,722评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,760评论 2 314
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,537评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,381评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,787评论 3 300
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,030评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,304评论 1 252
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,734评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,943评论 2 336

推荐阅读更多精彩内容