Kafka是一个消息中间件,这是很多人对Kafka的第一印象,而Apache Kafka将自己定位为一个分布式的流处理平台(A distributed streaming platform),其实纵观Kafka发展的历史,Kafka是从消息引擎起家的,但是发展至今,它已经不只是消息引擎了。针对“What is Kafka?”这个问题,可以用一句话概括为,Apache Kafka是一个消息引擎系统,也是一个分布式的流处理平台,除此之外,Kafka还能被用作分布式储存系统。具体来说,在0.10.0.0版本之前,Kafka社区将其定位为一个分布式、分区化且带备份功能的提交日志服务,Kafka社区在0.10.0.0版本正式推出了流处理组件Kafka Streams,从这个版本开始Kafka变身成为分布式的流处理平台,至于作为分布式储存系统,这个鲜有在生产环境使用的案例。
Topic: 主题是消息的逻辑容器;
Broker: 一个broker代表一台Kafka服务器,一个Kafka集群由多个broker组成;
Partition:每个topic分为多个分区,多个分区分布在不同的broker上,每个分区是一组有序的消息日志;
Producer:生产者,向主题发送消息应用程序;
Consumer:消费者,从主题订阅消息并消费的应用程序;
Replica:副本,每个分区都有一个或者多个副本,副本主要用作数据冗余和failover,副本分为Leader和Follower,Leader提供读写服务,Follower负责从Leader副本来去日志消息;
Record:消息,Kafka处理的主要对象;
Consumer offset:消费者位移,用于表征消费者的消费进度;
Consumer group:多个消费者实例公用一个groupId构成一个消费组,可以同时消费多个分区的消息;
Rebalance:消费者组重新分配订阅主题分区的过程,是Kafka消费端实现高可用的重要手段
1)为何要分区
分区提供负载均衡的能力,实现了系统的高伸缩性,不同的分区可以放置到不同节点的机器上,数据的读写操作都是针对分区粒度进行的,这样每个节点的机器都能独立的执行各自分区的读写操作,还能够通过增加节点的方式来提高系统的吞吐量。
2)分区策略,各有什么特点
常见的分区策略包括:轮询策略(Round-robin)、随机策略(Randomness)、按消息key保存策略(Key-ordering)等
轮询策略指的是将消息顺序发送到不同的分区,比如一个topic有两个分区,那么第一条消息分送到分区0,第二条消息发送到分区1,第三条消息发送到分区0,以此类推,轮询策略具有非常优秀的负载均衡表现,能够让消息最大限度的均匀分布在不同的分区上,这是Kafka Java API生产端默认的分区策略;随进策略的思想是每次发送的消息时生成一个代表分区的随机数,力求将消息均匀的分布在各个分区,其负载均衡表现没有轮询策略稳定;按消息key保存策略是求取key的哈希值对分区数取模来确定消息发送到那个分区,Kafka允许对每条消息定义消息key,这个key一般是一个有着明确业务含义的字符串,用于表示不同的业务逻辑,这种策略能够保证同一个key的消息进入同一个分区,从而保证消息的顺序性。
Kafka默认的分区策略是,如果消息指定了key,则按照消息key保存策略进行分区,如果没有指定key,则按照轮询策略进行消息分区。除了上述三种常用的分区策略之外,Kafka提供了接口可以实现定制化的分区策略,比如通过消息所在的区域来进行分区来降低网络时延。
Kafka对已提交的消息作有限度的持久化保证,首先要理解什么是已提交的消息,当若干个Broker成功接收到一条消息并写入日志文件后,会告诉生产者这条消息已成功提交,此时这条消息才是“已提交”的消息,有限度指的是保存消息的分区至少有一个存活。
消息丢失分为生产端消息丢失和消费端消息丢失
1)生产端消息丢失
目前生产者API发送消息是异步的,
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return send(record, null);
}
这个方法是立即返回的,但是我们不能认为此时消息发送成功了,如果网络抖动或者消息太大超过了broker的承受能力等原因都可能导致消息发送失败,对于Kafka来说这条消息不是“已提交”的消息,故不能保证它的持久性,如果没有重试的补救措施,那么消息就“丢失”了。
2)消费端消息丢失
Consumer端用offset即位移的概念来表示消费者当前消费到的Topic分区的位置,下图清晰的展示了Consumer端的位移数据。
这个位移就像是我们读书时用到的书签一样,书签标记了我们读书的位置,更新书签的正确方式应该是先看书然后更新书签的位置,如果顺序颠倒了,即先更新了书签,然后看书,若看书过程中被打断,下次过来从书签的位置开始看书,就有可能丢失前面的内容,Kafka消费端消息丢失的场景和这个看书的场景是一样的。我曾今遇到一个线上消费端丢失消息的场景,采用的是自动提交的方式,消费端开启线程池来消费消息,消息放在JVM的阻塞队列中排队消费,当JVM进程发生OOM等异常重启时,阻塞队列中的消息就全部丢失了。
Kafka无消息丢失的最佳实践
- 永远使用带有回调函数的send方法
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
- 设置acks=all,acks是Producer端的参数,代表了对“已提交消息”的定义,设置为all表示所有副本都要接收到消息,该消息才算“已提交”
- 设置retries为一个比较大的值,retries是Producer端的参数,表示发送失败后的重试次数,可以一定程度的应对网络抖动等原因造成的消息发送失败
- 设置unclean.leader.election.enable=false,控制不允许Unclean领导者选举
什么是Unclean领导者选举?对于一个消息分区来说,kafka内部会维护一个ISR副本集合,正常情况下,这个副本集合至少包含一个副本,即Leader副本,ISR集合为空时,说明Leader副本不能对外提供服务,此时需要进行Leader选举,Kafka把所有不在ISR副本集合中的其他存活副本统称为非同步副本,通常来说,非同步副本可能落后Leader副本太多,如果选择这些副本中的一个作为新的Leader副本,可能会造成数据丢失,unclean.leader.election.enable参数控制是否允许进行Unclean领导者选举,如果设置为开启,可以提高系统的可用性,不会对外停止服务,如果设置为关闭,提高了分布式系统的一致性,根据CAP理论,Kafka把C和A的选择权交给用户。建议设置为关闭,因为还有很多途径实现高可用,数据丢失可能会造成一些无法挽回的损失。 - 设置replication.factor>=3,这个是broker端的参数,设置合理的副本数,通过消息冗余防止消息丢失
- 设置min.insync.replicas>1,broker端的参数,规定消息至少要写入多少个副本才算是“已提交”
- 确保replication.factor>min.insync.replicas,如果replication.factor=min.insync.replicas,那么只要有一个副本挂掉了,整个分区就无法正常工作了,可用性差
- 设置enable.auto.commit=false,采用手动提交的方式,确保消息消费完成之后再提交
消费者组(Consumer Group)是Kafka提供的可扩展且具有容错性的消费机制,多个消费者实例共用一个GroupId构成一个消费组,组内的所有消费者协调在一起消费订阅主题的所有分区,消费组有三个特性:
- Consumer Group小可以有一个或者多个消费者实例,实例可以是单独的进程,也可以是同一个进程下的不同线程;
- Group ID是一个字符串,在Kafka集群中,它能够唯一标示一个消费组;
- 单个分区只能被消费组中的一个消费者实例消费,当然它能够被其它消费组的消费者消费。
消费者组是Kafka中比较亮眼的设计,Kafka用消费者组的机制实现了消息引擎系统的两大模型,即点对点的模型和发布订阅模型,如果所有实例属于同一个Group,那么它实现的是点对点的消息队列模型,如果所有实例属于不同的消费组,那么它实现的发布订阅模型。
Rebalance本质上是一种协议,规定了消费组的所有消费者实例如何达成一致,来分配订阅的topic的所有分区。Rebalance的触发条件有三个: - 消费组成员发生变更,如新加入消费者实例,或者某个消费者实例异常崩溃被踢出消费组;
- 订阅的主题数发生变化;
- 订阅主题的分区数发生变化。
Rebalance的通知机制是通过心跳线程来完成的,当协调者决定开启新一轮重平衡之后,会将“REBALANCE_IN_PROCESS”封装到心跳请求的响应中,返回给消费者实例,当消费者实例发现心跳响应中包含“REBALANCE_IN_PROCESS”,就知道重平衡开始了。消费端参数heartbeat.interval.ms的作用是控制重平衡通知的频率,如果想要消费者实例快速的得到Rebalance的通知,可以将该参数设置为一个很小的值。Rebalance的过程对消费过程有极大地影响,类似于JVM垃圾收集器中的Stop the world,Rebalance的过程中,消费组的所有消费者实例都会停止消费,等待Rebalance完成,应该尽量避免Rebalance的发生。
副本机制指的是分布式系统在多态网络互连的机器上保存有相同的数据拷贝,副本机制的好处包括:
- 提供数据冗余,即使部分组件失效,系统依然能够正常运行,提高了系统的可用性和数据持久性;
- 提供高伸缩性,支持横向扩展,能够通过增加机器的方式来提升读性能,进而提高系统读操作的吞吐量;
- 改善数据局部性,将数据放在离用户比较近的地方,降低网络时延。
对于Kafka而言只能享受到第一个好处,它是Kafka确保系统高可用和数据持久性的重要基石,Kafka采用的是基于领导者(Leader-based)的副本机制,工作原理如下图所示
在Kafka中副本分为领导者副本(Leader Replica)和跟随者副本(Follower Replica),每个分区都要选举一个领导者副本,其余的副本则为追随者副本,Kafka中的Follower Replica是不对外提供读写服务的,任何一个Follower Replica都不会响应生产端和消费端的读写请求,所有的读写请求都发生在Leader Replica所在的Broker,Follower Replica的唯一任务就是从Leader Replica上异步拉取日志消息写入到自己的日志文件中,从而实现与Leader Replica的同步。当Leader Replica挂掉之后,会开启新的一轮领导者的选举从Follower Replica中选一个作为新的Leader,这里需要强调的一点是不是所有的Follower都有机会成为Leader的,只有和老的Leader保持同步的Follower才能参加选举,Kafka引入In-sync Replicas(ISR)来表示保持同步的一组Replica,需要明确的是Leader Replica天生在ISR中,也就是说ISR不仅仅是Follower副本的集合,正常情况下,ISR必然包括Leader副本。
Kafka是通过Broker端的参数replica.lag.time.max.ms来判断某个副本是否处于同步的状态,Follower副本能够落后Leader副本的最长时间间隔,默认为10s,若Follower副本落后Leader副本的时间不超过10s,则认为该Follower副本是和Leader副本同步的,如果超过10s,则认为Follower副本是非同步的,会被踢出ISR副本集合。
- 顺序读写
Kafka采用顺序读写磁盘的方式 - 消息分区
分区提供负载均衡的能力,实现了系统的高伸缩性,不同的分区可以放置到不同节点的机器上,数据的读写操作都是针对分区粒度进行的,这样每个节点的机器都能独立的执行各自分区的读写操作,还能够通过增加节点的方式来提高系统的吞吐量。 - 零拷贝
如果线上Kafka服务器选用的是Linux操作系统,那么Kafka能够享受到零拷贝所带来的快速数据传输特性,Kafka客户端底层使用了Java NIO中的Selector,其在Linux上的实现是epoll,而在Windows操作系统上实现是select,所以线上操作系统的选型推荐Linux - 数据压缩
秉承时空换空间的思想,数据压缩就是用CPU时间去换磁盘空间或者网络IO的传输量,希望以较小的CPU开销带来更小的磁盘占用或者更少的网络IO传输。 - 批量发送
线上Kafka的性能调优主要从四个层次和两个方面来考虑,四个层次分别是操作系统层,JVM层,框架层和应用程序层。对操作系统的优化主要包括操作系统选型和一些系统参数设置,如我们推荐使用Linux操作系统,建议将ulimit -n设置为一个很大的值,例如65535,来避免Too many files open这类的错误,建议将vm.max_map_count设置为一个较大的值,避免碰到OutOfMemoryError: Map failed的严重错误等,Kafka进程是普通的JVM进程,所以JVM层面的调优同样重要,主要包括堆的大小设置,建议将JVM堆大小设置在6~8G,其次是垃圾回收算法设置,建议设置为G1。框架层的调优主要是合理设置Kafka集群的各种参数。应用层调优指优化Kafka客户端应用程序代码,比如使用合理的数据结构或者复用构建成本大的对象等。
两个方面是从Kafka性能调优的目标来说的,分别是吞吐量和时延。
1)如何提高吞吐量
- Broker端
适量增加num.replica.fetchers的参数值,但是不要超过CPU核数,该参数表示Follower用多少个线程从Leader上拉取消息,如果CPU资源充足,适当增大该值能够提高Producer端的吞吐量;
调优JVM参数减少full GC; - Producer端
适当增加batch.size和linger.ms参数值,使得Producer一次发送更多的消息
设置compression.type=lz4或zstd;
设置ack=0或1;
设置retries=0;
如果多线程共享同一个Producer,就增加buffer.memory参数值; - Consumer端
采用多Consumer进程或者线程同时消费数据;
增加fetch.min.bytes参数值
2)如何降低时延
- Broker端
适量增加num.replica.fetchers的参数值,但是不要超过CPU核数; - Producer端
设置linger.ms=0;
不启用压缩,设置compression.typr=none;
设置acks=1; - Consumer端
设置fetch.min.bytes=1