kakfa从入门到放弃

一)初识kafka

消息中间件的使用已经越来越广泛,基本上具有一定规模的系统都会用到它,在大数据领域也是个必需品,但为什么使用它呢?一个技术的广泛使用必然有它的道理。

背景与问题

以前一些传统的系统,基本上都是“用户——系统——数据库”一条线,拿下单做例子,用户下单,系统接受并处理请求,把数据存到数据库。

这样的好处就是简单,但随着需求越来越多,用户量越来越大,系统需要承载的压力就越大;如果需要扩展系统,修改代码,牵一发动全身,麻烦滴很。

消息队列可以解决这些问题,它是一个存放消息的队列,生产者往队列推数,消费者从队列取数。

优点

解耦

一个系统一般都有很多个模块,但业务发展起来,系统的体量就跟着变大,就需要多做几个模块,然而每拓展一个模块就要多各种调用。
以一个交易系统为例,当完成一次交易,系统需要通知推荐系统、广告系统等。当多出一个模块,系统就要增加一个调用,从而需要修改代码。
要是你觉得改改代码不麻烦,可是改完,后面有一个模块出问题了咋办,一点一点排查,改错地方了还影响到了其他模块。
如果增加消息中间件,各个模块只需要完成各自的工作,然后将消息发到消息队列,由其他模块去取或者消息队列推送,就可以解决耦合的问题了。

异步

传统系统的话,一条路走到底,比如购买商品,完了扣除优惠券,再给你积点分。这每个流程可能就花一点时间,但合起来就很久了。
加上消息队列,我直接完成我的工作,再给队列,队列再通知其他模块,这不仅省事儿,还减少了不必要的时间浪费。

削峰

还是以传统系统为例,当并发量大的时候直接怼到数据库,数据库承受的压力得多大呀这是。欸,那就加个消息队列,把请求扔到消息队列,慢慢处理。

缺点

技术嘛,总是有好有坏,刚才说了它的优点,现在简单唠唠它的缺点。

首先,降低了系统的可用性,好好的一个系统,加一个中间件,如果它挂了,后面不得凉凉。

为了防止它挂掉或者挂掉了修复它,是不是得维护?是不是增加了运维成本?

不仅如此,还要考虑数据一致性问题,以及重复消费的问题,还要保证消息的可靠传输。要考虑的东西有多少,系统的复杂性就有多高。

消费模式

消息中间件一般有两种消费模式,一种是点对点模式,一种是发布订阅模式。

点对点是一种一对一的模式,一般消息只由一个消费者消费,导致消息没法复用;

发布订阅模式是一种常见的模式。消费者订阅,当有消息来的时候通知消费者。这种模式也分为两种情况,一种是由消息队列推送,类似公众号订阅一样,只要消费者订阅了,消息一来就推给订阅了的消费者。

但是这种方法也有缺点,因为消费者的处理速度不一样,有快有慢,容易出现问题。比如消息队列推送的速度为100M/s,消费者A处理速度为10M/s,消费者B处理速度为500M/s,这时候A就崩了,对于B来说,又造成资源浪费。

所以由消费者主动拉取的方式诞生了,由消费者主动拉数据,解决了上面的问题,但技术有优点的同时,一般都存在缺点。由于消费者要主动拉取,需要维护一个长轮洵去询问队列,但当遇到长时间没有消息的情况,就造成了资源浪费。

本文的主角 Kafka 是基于拉取的发布订阅模式。

kafka基础架构

Producer:生产者,发布消息的对象,将消息推到 Kafka 集群

Consumer:消费者,消费消息的对象

Consumer Group:消费者组,Kafka 中可以将多个 Consumer 分为一个组,从整体上可以将它看作是一个Consumer

Broker:一台 Kafka 服务器就是一个 Broker,多个 Broker 组成一个 Cluster

Topic:消息的主题,可以理解为一个消息队列

Partition:分区,一个 Topic 可以分为多个 Partition,这样的好处是负载均衡;同时,一个 Partition 可以有多个副本,提高可靠性。对于 Consumer Group 来说,一个消费者组中的消费者只能订阅同一个 Topic 的不同分区,可以提高效率,又避免重复消费。

Leader & Follower:对于同一个 Partition 而言,消费者只读取 Leader 的消息,而不会读取 Follower 的消息,Follower 是 Leader 的副本,在 Leader 挂掉的时候 Follower 可成为 Leader

Zookeeper: Kafka 是基于 zk 的,用于集群管理

kafka架构为什么这样设计

如果消息中间件只有一台机,哪天突然宕机了,整个系统就崩了。因此需要整一个集群,搞多台服务器,所以我们搞几个 Broker。

然后生产者准备发送消息了,如果正巧所有的消息都随机地发到其中某一台机器上,流量全上去了,生产者消费者都来找他,看着其他机器都在摸鱼,它突然不干了。

于是要合理分配工作,整出了 Partition,每个 Topic 对应每个生产者和消费者,同一个 Topic 又分成多个分区,分别在不同的 Broker,分担了单台节点的压力。

不过现在又有一个问题,如果一台 Broker 宕机,该节点上的分区数据也没了。为了防止单节点故障造成数据丢失,每个分区存几个副本保存在其它 Broker。

但消费者只能访问其中一个分区,不然会造成重复消费的现象,所以要区分好 Leader 和 Follower,并使消费者只能访问 Leader,而 Follower 需要在 Leader 发生故障的时候成为新的 Leader。

二)生产者

消息中间件必须与生产者和消费者一起存在才有意义,这次先来聊聊Kafka的生产者。

在开始之前,先了解一下消息在Kafka中是如何存储的,如下图所示,一般我们称那些数字为offset(偏移量)一般来说,消息在持久化后应该是有序的,这里的有序是针对分区的,而不是针对 Topic 的。

而且,生产者写入消息时,是往 Leader 写入,Follower 从 Leader 进行复制。

别看生产者只是发消息,调用 API 也是几行代码,但它的学问多着呢。为更好地理解后面的内容,请带着以下问题阅读:

  • 生产者发送消息前会做什么准备?
  • 生产者发送消息怎么保证数据不丢失?
  • 生产者发送消息如何保证消息有序性?
  • 生产者发送的消息是怎么分区的?
    生产者设计了一个缓冲池,可以通过修改 buffer.memory 参数设置其大小;缓冲池内又有多个 Batch,当有多个消息需要写入同一个分区时,消息会先往 Batch 里面写入,等消息达到 batch.size 的时候开始发送,如果 batch.size 设置太小,生产者会频繁发送消息,带来更多的网络开销;

有些读者可能有这个疑问,如果有时候生产者生产的消息很少很小,一直达不到批次的大小,而消费者对时效性要求比较高,这种情况怎么办?其实,默认情况下,只要有线程,即使批次里只有一条消息,也会直接发送出去。但是,可以设置参数 linger.ms 来指定等待消息加入批次的时间,只要当批次消息达到 batch.size 或者等待时间达到 linger.ms 的时候,消息就会发送。

除此之外,生产者可以对消息进行压缩,以降低网络开销以及存储开销,通过设置参数 compression.type 设置相应的压缩算法。

先抛开 Kafka 现有确认机制,假如一条消息发到对应分区后,没有任何确认就紧接着发送第二条,很难不造成数据丢失。

于是我们让分区在收到消息后返回确认消息给生产者,生产者收到后发送下一条。

就这样,消息很顺利地发着,正好在 Leader 拿到最新的消息并返回确认给生产者的时候,Leader 挂了,此时,Follower 还没同步最新的消息,而生产者已经接收到了分区返回的确认,这时候还是丢了数据。

因此我们让 Leader 以及参与复制的 Follower 都收到消息后返回确认,这样就能最大程度保证消息不丢失,不过延迟较高。

针对上述的情况,Kafka 设置了一个 acks 参数,指定了必须有几个副本收到消息生产者才认为是写入成功了。

  • acks=0,生产者只管写入,不会等待 Broker 返回响应,默认成功。这种情况最容易造成数据丢失,不过吞吐量最高;

  • acks=1,Leader 收到消息后响应,生产者才认为写成功,这种也会造成丢失;

  • acks=all,Kafka 集群内部会维护一个副本清单 ISR(后续会写,再此不做描述),当 ISR 里的所有副本都收到消息,才认为写入成功,最大程度保证消息不丢失,不过可能会造成延迟较高。

另外,Kafka 还有一个参数 retries,表示当消息发送失败后,生产者重试的次数,默认为0,如果对丢失消息零容忍,那就不能设置为0.

事实上,生产者在收到分区返回的确认消息前,还是可以持续发送消息的,这个可以通过设置 max.in.flight.requests.per.connection 参数进行修改,这个参数指定了生产者在收到响应前可以发送多少个消息。

这里需要注意的是,如果这个参数不为1,而 retries 参数也不为 0 的时候,当发生重试的时候,有可能造成分区数据顺序错乱。在有些场景下,顺序是很重要的,比如分析交易流水的过程,某个第一次存款的客户先存1块钱再取1块钱是正常的,但反过来可能就有点奇怪了。

所以,如果要保证数据不丢失,同时要保证数据有序性,就需要将 retries 设置为非 0 整数,max.in.flight.requests.per.connection 设置为 1,注意不是 0.

生产者可以指定键作为分区键,如果不指定,生产者会使用轮询算法将消息均匀的发到各个分区上。

但如果指定了分区键,Kafka 会使用自己的 hash 算法获得 hash 值,然后根据 hash 值发到相应的分区。

到这,回顾一下前面的几个问题,是不是有点豁然开朗了?

三)消费者

消费者与消费者组

在Kafka中消费者是消费消息的对象。假设目前有一个消费者正在消费消息,但生产数据的速度突然上升,这时候消费者会有点力不从心,跟不上消息生产的速度,这时候咋办呢?

我们对消费者进行横向扩展,加几个消费者,达到负载均衡的作用。但是要做点限制吧,不然几个消费者消费同一个分区的消息,不仅没办法提高消费能力,还会造成重复消费。因此让他们分别消费不同的分区。

在Kafka中的消费者组就是如此,一个消费者组内的消费者订阅同一个Topic的数据,但消费不同分区的数据,提高了消费能力。

但是消费者组里的消费者数量建议不要超过分区数量,不然就浪费资源。

LEO & HW

Kafka中的分区是可以有多个副本的,我们把每个副本中待写入的那个offset称为LEO(Log End Offset),把最少消息的那个副本的LEO称为HW(High Watermark)

对于消费者而言,消费者所能消费的区间就是小于HW那部分,即图中 0-3 部分。这样消费者不管是哪个副本,订阅到的消息都是一致的,即使换了leader也能接着消费。

提交偏移量

假如一个消费者退出,另一个消费者接替它的任务,这时候就需要知道上一个消费者消费到了哪条数据,因此消费者需要追踪偏移量。

在Kafka中,有一个名为_consumer_offset的主题,消费者会往里面发送消息,提交偏移量,这个时候消费者也是生产者。

当消费者挂了或者有新的消费者假如消费者组,就会触发在均衡操作,即为消费者重新分配分区。

为了能够继续之前的操作,消费者需要获取每个分区最后一次提交的偏移量。

如果提交的偏移量小于处理的最后一个消息的偏移量,会造成重复消费。比如消费者提交了 6 的offset,此时又拉取了2条数据,还没等提交,消费者就挂掉了,然后就发生了再均衡。新的消费者获取到 6 的偏移量,接着处理,这就造成了重复消费。

如果提交的偏移量大于处理的最后一个消息的偏移量,会造成数据丢失。比如消费者一次性拉取了 88 条数据,并且提交了偏移量,还没处理完就宕机了,新的消费者获取 88 的偏移量,继续消费,就造成了数据丢失。

因此,如何提交偏移量对客户端影响很大,稍有不慎就会造成不好的影响。

在Kafka中,有几种提交偏移量的方式。

自动提交

这种提交方式有两个很重要的参数:

enable.auto.commit=true(是否开启自动提交,true or false)

auto.commit.interval.ms=5000(提交偏移量的时间间隔,默认5000ms)

这种方式最容易造成数据丢失以及重复消费。

通过CommitSync()方法手动提交当前偏移量

在处理完所有消息后提交,前提要把enable.auto.commit设置为false。

    ConsumerRecords<String, String> records = consumer.poll(100);
    for(ConsumerRecords<String, String> record: records){
        System.out.println("topic=%s, offset=%s,partition=%s",
                          record.topic(), record.offset(),record.partition());
    }
    try{
        consumer.commitSync();
    } catch(Exception e){
        log.error(e);
    }
}

消费者通过poll方法轮询获取消息,poll里的参数是一个超时时间,用于控制阻塞的时间,如果没有数据则会阻塞这么久,如果设置为0则会立即放回。

使用这种方法一定要在处理完所有记录后调用CommitSync()方法,避免数据丢失。如果发生错误,会进行重试。

异步提交

CommitSync() 提交偏移量的方式会造成阻塞,即需要等客户端处理完所有消息后才提交偏移量,限制了吞吐量。因此可以使用异步提交的方式,通过调用commitAsync()方法实现。

   ConsumerRecords<String, String> records = consumer.poll(100);
   for(ConsumerRecords<String, String> record: records){
       System.out.println("topic=%s, offset=%s,partition=%s",
                         record.topic(), record.offset(),record.partition());
   }
   consumer.commitAsync();
}

提交偏移量后就可以去做其他事了。CommitSync()方式发生错误会重试,但CommitAsync()不会。

之所以不重试,是因为有可能在收到broker响应前有其它偏移量提交了。

试想一下,如果会重试的话,当提交 66 的偏移量时发生网络问题,与此同时提交了 88 的偏移量,这时候刚好网络又通了,然后 88 的偏移量就提交成功了,然后 66 就重试,成功后又变成 66 了,就有可能造成重复消费。

之所以说这个问题,是因为异步提交支持在broker响应时回调,常被用于记录错误或生成度量指标。如果用他重试的话一定要注意提交的顺序。

    ConsumerRecords<String, String> records = consumer.poll(100);
    for(ConsumerRecords<String, String> record: records){
        System.out.println("topic=%s, offset=%s,partition=%s",
                          record.topic(), record.offset(),record.partition());
    }
    consumer.commitAsync(new OffsetCommitCallback() {
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,          Exception e){
            if(e != null){
                log.error("Error");
            }
        }
    });
}
异步与同步组合提交

如果发生在关闭消费者或者再均衡前的最后一次提交,就需要确保其成功。

因此在消费者关闭前一般会通过组合使用的方式确保其提交成功。

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for(ConsumerRecords<String, String> record: records){
            System.out.println("topic=%s, offset=%s,partition=%s",
            record.topic(),record.offset(),record.partition());
        }
        consumer.commitAsync();
    }
}catch(Exception e){
    log.error(e);
}finally {
    try {
        consumer.commitSync();
    }
    finally{
        consumer.close();
    }
}
提交特定偏移量

commitSync() 和 commitAsync() 方法一般是在处理完一个批次后提交偏移量。如果需要更频繁的提交偏移量,需要在处理的过程中间提交的话,消费者 API 允许在调用 commitSync()和 commitAsync () 方法时传进去希望提交的分区和偏移量的 map

int count = 0;      
try {
    while(true){
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        if (records.isEmpty()){
           continue;
        }
        for (ConsumerRecord<String, String> record : records){
            System.out.println("topic=%s, offset=%s,partition=%s",
                record.topic(),record.offset(),record.partition());
            currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset(), "no metadata"));
            // 每处理完1000条消息后就提交偏移量
            if (count%1000==0) {
                consumer.commitAsync(currentOffsets, null);
            }
            count++;
        }
    }
} finally {
    try{
        consumer.commitSync();
    } finally{
        consumer.close();
    }
}

消费者分区分配策略

分区会被分配给消费者组里的消费者进行消费,在Kafka种可以通过配置参数partition.assignment.strategy选择分区分配策略。

  • Range 范围分区
    假设现在有10个分区,消费者组里有3个消费者。
    分区数量 10 除以消费者数量 3 取整(10/3)得 3,设为 x;分区数量 10 模 消费者数量 3(10%3)得 1,设为 y
    则前 y 个消费者分得 x+1 个分区;其余消费者分得 x 个分区。
  • RoundRobin 轮询分区
    假设有10个分区,3个消费者,第一个分区给第一个消费者,第二个给第二个消费者,第三个分区给第三个消费者,第四个给第一个消费者... 以此类推

转载:公众号“大数据的奇妙冒险”

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,530评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,403评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,120评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,770评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,758评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,649评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,021评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,675评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,931评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,751评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,410评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,004评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,969评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,042评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,493评论 2 343

推荐阅读更多精彩内容