Kafka不是完全同步,也不是完全异步,是一种特殊的ISR(In Sync Replica)。
ISR(in-sync replica) 就是 Kafka 为某个分区维护的一组同步集合,即每个分区都有自己的一个 ISR 集合,处于 ISR 集合中的副本,意味着 follower 副本与 leader 副本保持同步状态,只有处于 ISR 集合中的副本才有资格被选举为 leader。
Kafka的Replica
- kafka的topic可以设置有n个副本(replica),副本数最好要小于等于broker的数量,也就是要保证一个broker上的replica最多有一个。
- 创建副本的单位是topic的分区,每个分区有1个leader和0到n-1follower,Kafka把多个replica分为Lerder replica和follower replica。
- 当producer在向topic partition中写数据时,根据ack机制,默认ack=1,只会向leader中写入数据,然后leader中的数据会复制到其他的replica中,follower会周期性的从leader中pull数据,但是对于数据的读写操作都在leader replica中,follower副本只是当leader副本挂了后才重新选取leader,follower并不向外提供服务。
Kafka ISR机制
ISR副本: 就是能跟首领副本基本保持一致的跟随副本,如果同步的速度太慢的话,就会被踢出ISR副本。
副本同步:
LEO(last end offset):日志末端位移,记录了该副本对象底层日志文件中下一条消息的位移值,副本写入消息的时候,会自动更新 LEO 值。如果LE0 为2的时候,当前的offset为1。
HW(high watermark):高水印值,HW 一定不会大于 LEO 值,小于 HW 值的消息被认为是“已提交”或“已备份”的消息,并对消费者可见。
producer向leader发送消息,之后写入到leader,leader在本地生成log,之后follow从leader拉取消息,follow写入到本地的log中,会给leader返回一个ack信号,一旦收到了ISR中的所有的ack信号,就会增加HW,然后leader返回给producer一个ack。
Kafka的复制机制
kafka 每个分区都是由顺序追加的不可变的消息序列组成,每条消息都一个唯一的offset 来标记位置。
kafka中的副本机制是以分区粒度进行复制的,在kafka中创建 topic的时候,都可以设置一个复制因子(replica count),这个复制因子决定着分区副本的个数,如果leader 挂掉了,kafka 会把分区主节点failover到其他副本节点,这样就能保证这个分区的消息是可用的。leader节点负责接收producer 发过来的消息,其他副本节点(follower)从主节点上拷贝消息。
[站外图片上传中...(image-31f1f9-1614765714779)]
kakfa 日志复制算法提供的保证是当一条消息在producer端认为已经committed的之后,如果leader 节点挂掉了,其他节点被选举成为了 leader 节点后,这条消息同样是可以被消费到的。
关键配置: unclean.leader.election.enable
Indicates whether to enable replicas not in the ISR set to be elected as leader as a last resort, even though doing so may result in data loss
Type: boolean
Default: false
Valid Values:
Importance: high
Update Mode: cluster-wide
默认为 false
, 即允许不在isr中replica选为leader,这个配置可以全局配置,也可以在topic级别配置。
这样的话,leader选举的时候,只能从ISR集合中选举,集合中的每个点都必须是和leader消息同步的,也就是没有延迟,分区的leader 维护ISR 集合列表,如果某个点落后太多,就从 ISR集合中踢出去。
producer 发送一条消息到leader节点后, 只有当ISR中所有Replica都向leader发送ACK确认这条消息时,leader才commit,这时候producer才能认为这条消息commit了,正是因为如此,kafka客户端的写性能取决于ISR集合中的最慢的一个broker的接收消息的性能,如果一个点性能太差,就必须尽快的识别出来,然后从ISR集合中踢出去,以免造成性能问题。
如何判断副本不会被移除ISR集合?
replica.lag.max.messages
: follower副本最大落后leader副本的消息数。(0.9.0.0版本后移除)。
replica.lag.time.max.ms
: 不仅指自从上次从副本获取请求以来经过的时间,而且还指自上次捕获副本以来的时间。
设置replica.lag.max.messages
为3,只要 follower 只要不落后leader 大于2条消息,就然后是跟得上leader的节点,就不会被踢出去。
设置 replica.lag.time.max.ms 为 300ms, 意味着只要 follower 在每 300ms内发送fetch请求,就不会被认为已经dead ,不会从ISR集合中踢出去。
结语
Replica的目的就是在发生意外时及时顶上,leader失效后,就需要从follower中马上选一个新的leader 。选举时优先从ISR中选定,因为这个列表中follower的数据是与leader同步的,从他们中间选取可以保证数据完整 。
但如果不幸ISR列表中的follower都不行了,就只能从其他follower中选取,这时就有数据丢失的可能了,因为不确定这个follower是否已经把leader的数据都复制完成了。
还有一种极端情况,就是所有副本都失效了,这时有两种方案:
等待ISR中的一个活过来,选为Leader,数据可靠,但活过来的时间不确定 。
选择第一个活过来的Replication,不一定是ISR中的,选为leader,以最快速度恢复可用性,但数据不一定完整。
Kafka支持通过配置选择使用哪一种方案,可以根据可用性和一致性进行权衡。