本文主要内容:
①kafka复制机制
②分区leader副本宕掉怎么选新的leader
③高水位与leader epoch的详细分析。
④一些相关配置
Kafka复制机制
Kafka的主题被分为多个分区,分区是基本的数据块。分区存储在单个磁盘上,Kafka可以保证分区里的事件是有序的,分区可以在线(可用),也可以离线(不可用)。每个分区可以有多个副本,其中一个副本是leader副本。所有的生产者请求和消费者请求都经过leader副本,leader副本以外的副本都是follower副本,follower副本不处理来自客户端的请求,它们唯一的任务就是从leader副本那里复制消息,保持与leader副本一致的状态。如果leader 副本发生崩溃,其中的一个follower副本会被提升为新的leader副本。
当然,由于网络问题、broker崩溃等导致follower副本复制滞后,这时它肯定不能成为新的leader副本。那么符合哪些条件的follower副本才可以成为leader副本呢?答案是“同步副本”,同步副本是满足如下条件的副本:
1.leader副本是同步副本
2.与zookeeper之间有一个活跃的会话,也即在过去6S(可配置)内向zookeeper发送过心跳。
3.在过去的10S内(可配置)从leader副本那里获得过信息。
4.在过去10S内从leader副本那里获取过最新的信息。(光从leader那里获取信息是不够的,还必须是几乎零延迟的)
leader会跟踪与其保持同步的副本列表,该列表称为ISR(即in-sync Replica)。如果一个follower宕机,或者落后太多,leader将把它从ISR中移除。这里所描述的“落后太多”指Follower复制的消息落后于Leader后的条数超过预定值(该值可在server.properties中通过replica.lag.max.messages配置,其默认值是4000)或者Follower超过一定时间(该值可在server.properties中通过replica.lag.time.max.ms来配置,其默认值是10000)未向Leader发送fetch请求。
举例说明:
上图的Kafka集群,有个topic1的主题,有3个分区,每个分区副本为3。其中红色的是leader副本,绿色的是follower副本。这里涉及到副本分配算法,如下:
- 将所有N Broker和待分配的i个Partition排序.
- 将第i个Partition分配到第(i mod n)个Broker上.
- 将第i个Partition的第j个副本分配到第((i + j) mod n)个Broker上.
在本例中N=4,i=0,1,2。可以看到符合副本分配算法。
Kafka 的LEO和HW
首先介绍Kafka所有副本都有的两个重要属性:LEO和HW。
- LEO:即日志末端位移(log end offset),记录了该副本底层日志(log)中下一条消息的位移值。注意是下一条消息!也就是说,如果LEO=10,那么表示该副本保存了10条消息,位移值范围是[0, 9]。
- High Watermark(高水位线)以下简称HW,表示消息被leader和ISR内的follower都确认commit写入本地log,所以在HW位置以下的消息都可以被消费(不会丢失)。
举个例子,正常情况下,如下图:
当follower可能由于IO耗时过高,经历了一次full gc耗时很多时,达到了不满足同步副本的条件,follower被移除了ISR列表。此时的HW还是6,leader上的LEO是10。消费者只能消费offset到6。
等待broker回复正常,又加入ISR列表。所以ISR列表是动态的。维护好ISR列表,就能保证leader不可用时,可以从ISR中选一个replication作为leader继续工作。
Leader Epoch 与 高水位的讨论
0.11版本之前,0.11之前副本备份机制主要依赖水位(或水印)的概念,而0.11采用了leader epoch来标识备份进度。
首先说一下Kafka的复制协议:
Kafka复制协议有两个阶段,第一阶段,follower从leader获取到消息;第二阶段,在下一轮的RPC中向leader发送fetch request确认收到消息。假设其他的follower也都确认了,那么leader会更新HW,并在接下来的RPC中响应给follower。
同时,在重启一个follower时,这个follower可能会把日志截断到HW处(意味着此follower将会删除一些消息),然后从leader获取消息。
正式有与Kafka的复制协议分两个阶段,导致使用高水位会出现数据丢失和数据不一致的问题,下面我们分别讲一下这两种问题:
①数据丢失【Scenario 1: High Watermark Truncation followed by Immediate Leader Election】
假设有A、B两个Broker,初始时B为leader,A从B中取到消息m2,所以A中有消息m2了,但是由于在下一轮RPC中,A才会更新自己的HW,所以此时A的HW没变。如果这时候A重启了,他截取自己的日志到HW并发送一个fetch request到B。不幸的是,B这时宕机了,A成了新的leader,那么此时消息m2就会永久的丢失了。
②数据不一致:【Scenario 2: Replica Divergence on Restart after Multiple Hard Failures】
假设我们有两个Broker,初始时A是leader,B是follower。A接收到m2消息,但B还没来得及复制时,断电了。过了一会,B重启了,成为了leader,接收了m3消息,HW+1。然后A重启了,截断日志到高水位,但是此时的消息却出现了不一致。
在0.11版本使用leader epoch解决这两个问题。
Leader Epoch: 32位,单调递增的数字。代表单个分区所处的leader时代。每发生一次leader转换,就+1。例如leader epoch =2,说明处于第二leader时代。
Leader Epoch Start Offset:该epoch版本的leader写入第一条消息的位移。
Leader Epoch Sequence File: 存储Leader Epoch和Leader Epoch Start Offset对,类似如下形式:
0,100
1,200
3,500
我们说一下Leader epoch的工作步骤,然后在分析上面两个例子。
- Evolve the message format so that every message set carries a 4-byte Leader Epoch number.
使每个消息都包含一个4字节的Leader Epoch number - In every log directory, we create a new Leader Epoch Sequence file, where we store the sequence of Leader Epoch and the Start Offset of messages produced in that epoch. This is cached in every replica, in memory as well.
在每个log目录,创建Leader Epoch Sequence file用来存储Leader Epoch和Start Offset。 - When a replica becomes a leader, it first adds the new Leader Epoch and the log end offset of the replica to the end of Leader Epoch Sequence file and flushes it *. Each new message set produced to the leader is tagged with the new Leader Epoch.
当一个副本成为leader,它首先在Leader Epoch Sequence file 末尾添加一条新的记录,并把他flush到磁盘。每条新的消息就会被新的Leader Epoch标记。 - When a replica becomes a follower, it does the following steps:
当一个副本成为follower时(比如重启),它会做以下事情:
4.1 Recover all Leader Epochs from the Leader Epoch Sequence file, if needed.
从Leader Epoch Sequence file恢复所有的Leader Epoch。我的理解是,比如宕机太久,这期间换了好几次leader,那么就要把这些leader时代的消息都恢复过来。
4.2 Send a new LeaderEpoch request for the partition to the leader. The request includes the latest Leader Epoch in the follower's Leader Epoch Sequence.
向分区leader发送一个LeaderEpoch请求,请求包含了该follower的 Leader Epoch Sequence文件中最新的Leader Epoch。
4.3 The leader responds with the LastOffset for that LeaderEpoch. LastOffset will be the start offset of the first Leader Epoch larger than the Leader Epoch passed in the request or the Log End Offset if the leader's current epoch is equal to the one requested.
Leader向follower返回对应LeaderEpoch的LastOffset。这个LastOffset有两种可能,一种是比follower发送的请求中的Leader Epoch大1的开始offset,另一种是如果当前leader epoch与请求中的leader epoch相等,那么就返回当前leader的LEO。
4.4 If the follower has any LeaderEpoch with a start offset larger than the LastOffset returned from the leader, it resets its last Leader Epoch Sequence to be the leader’s LastOffset and flushes the Leader Epoch Sequence File.
如果当前follower有任何LeaderEpoch的开始偏移量大于从leader中返回的LastOffset 。那么他会重置Leader Epoch Sequence来和leader保持一直。
4.5 The follower truncates its local log to the leader’s LastOffset.
follower截断local log 到leader的LastOffset位置。
4.6 The follower starts fetching from the leader.
follower开始从leader获取数据
4.7.1 During fetching, if the follower sees the LeaderEpoch of a message set larger than its latest LeaderEpoch, it adds the new LeaderEpoch and the starting offset to its LeaderEpochSequence and flushes the file to disk.
在获取数据时,如果follower发现消息中的LeaderEpoch比自己的最新的LeaderEpoch 大,他会添加这个LeaderEpoch和开始偏移到LeaderEpochSequence文件,并刷写到磁盘。
4.7.2 The follower proceeds to append the fetched data to its local log.
follower继续获取数据。
For backward compatibility, in step 4.3, if the leader can't find the LastOffset (e.g., the leader hasn't started tracking Leader Epoch yet), the follower will fall back to the current approach by truncating the log to its high watermark.
那么我们用leader epoch的方式分析一下上面两个例子。
①解决数据丢失:
A重启后,向B发送LeaderEpochRequest请求,此时由于Leader Epoch相等,所以B返回给A的是B的LEO。此时B挂掉,A接收到LEO=2,与自己相同,所以不会截断,A成为leader,在文件中添加新的Leader epoch 和开始偏移即可。
②解决数据不一致问题:
A、B宕机后,B先重启成为leader,此时Leader Epoch由0变为1,并且LE的开始偏移为1。此时A重启,向B发送自己宕机时所处的Leader Epoch,也就是0。此时B返回LE1的开始偏移1。A发现自己的数据大于此值,便会截断自己的日志。从而保证数据的一致性。
相关配置
生产者端
生产者生产消息的时候,通过request.required.acks
参数来设置数据的可靠性。
request.required.acks=0
发过去就完事了,不关心broker是否处理成功,可能丢数据。
request.required.acks=1
当写Leader成功后就返回,其他的replica都是通过fetch去同步的,所以kafka是异步写,主备切换可能丢数据。
request.required.acks=-1
要等到ISR里所有机器同步成功,才能返回成功,延时取决于最慢的机器。强一致,不会丢数据。
Broker级别的配置
min.insync.replicas
指定最少的ISR副本数量。