kafka的消息可用性基于它的replication(副本)和leader/flower策略:
kafka中每一个topic的partition有N个replication,其中一个replication所在的broker称为该partition的leader,其他为flower。
如下图中,Kafka集群中有4个broker, 某topic有3个partition(p1,p2,p3),且复制因子即副本个数也为3:
leader会去维护ISA(副本同步队列)的列表
producer send数据到leader后,ISA中的副本会从leader同步数据,所以就会出现一个问题:本次produce什么情况下才算成功,什么情况下又需要重试?
一般通过设置副本同步的数量来确定produce是否重试,可能的情况有:
1、leader收到数据并保存后就返回成功
这种情况下如果副本还没同步数据,leader就挂掉了,数据就会丢失
2、所有副本都成功同步数据后才返回成功
这种情况下数据一定不会丢失,但是如果只有部分副本同步完成时leader就挂掉了,这时候会重新进行leader选举,producer也会重新发送数据,这样数据可能会重复。(在gn项目中采用本地数据库保存了produce失败的数据,这个和kafka这里是不冲突的,上面说的情况是leader接受到数据了,而gn中那个是防止gn的server问题导致消息没有发出去)
所以kafka中有3种可能传输模式:
At most once: 消息可能会丢,但绝不会重复传输
At least once:消息绝不会丢,但可能会重复传输
Exactly once:每条消息肯定会被传输一次且仅传输一次
上面的情况什么时候会出现呢?
1、Kafka的消息传输保障机制非常直观。当producer向broker发送消息时,一旦这条消息被commit,由于副本机制(replication)的存在,它就不会丢失。但是如果producer发送数据给broker后,遇到的网络问题而造成通信中断,那producer就无法判断该条消息是否已经提交(commit)。虽然Kafka无法确定网络故障期间发生了什么,但是producer可以retry多次,确保消息已经正确传输到broker中,所以目前Kafka实现的是at least once。
2、consumer从broker中读取消息后,可以选择commit,该操作会在Zookeeper中存下该consumer在该partition下读取的消息的offset。该consumer下一次再读该partition时会从下一条开始读取。如未commit,下一次读取的开始位置会跟上一次commit之后的开始位置相同。当然也可以将consumer设置为autocommit,即consumer一旦读取到数据立即自动commit。如果只讨论这一读取消息的过程,那Kafka是确保了exactly once, 但是如果由于前面producer与broker之间的某种原因导致消息的重复,那么这里就是at least once。
3、考虑这样一种情况,当consumer读完消息之后先commit再处理消息,在这种模式下,如果consumer在commit后还没来得及处理消息就crash了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息,这就对应于at most once了。
4、读完消息先处理再commit。这种模式下,如果处理完了消息在commit之前consumer crash了,下次重新开始工作时还会处理刚刚未commit的消息,实际上该消息已经被处理过了,这就对应于at least once。
Kafka只保证produce的消息不漏,即at lease once,而不保证produce和consume的消息不重。
重复发送:这个客户端解决不了,需要服务器判重,代价太大。
重复消费:有了上面的commitSync(),我们可以每处理完1条消息,就发送一次commitSync。那这样是不是就可以解决“重复消费”了呢?就像下面的代码:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer); //消除处理,存到db
consumer.commitSync(); //同步发送ack
buffer.clear();
}
}
答案是否定的!因为上面的insertIntoDb和commitSync做不到原子操作:如果在数据处理完成,commitSync的时候挂了,服务器再次重启,消息仍然会重复消费。
那这个问题有什么解决办法呢?
答案是自己保存committed offset,而不是依赖kafka的集群保存committed offset,把消息的处理和保存offset做成一个原子操作。
在kafka的官方文档中,列举了以下2种自己保存offset的使用场景:
1、关系数据库,通过事务存取。consumer挂了,重启,消息也不会重复消费
2、搜索引擎:把offset跟数据一起,建在索引里面
kafka的leader选举:
Kafka在Zookeeper中为每一个partition动态的维护了一个ISR(In-Sync Replicas)(所有的副本统称为Assigned Replicas,即AR。ISR是AR中的一个子集,副本的相对leader的延迟在规定的阈值内就会被放进ISR中,即保证ISR中副本的数据是新的,所以有资格成为leader),对于f+1个副本,一个Kafka topic能在保证不丢失已经commit消息的前提下容忍f个副本的失败
如果leader挂了,从副本中选取新leader的规则是什么呢,从业务来看肯定是将目前数据最新的副本设置为leader