at least once //重试
at most once //seq id (冥等性)
exactly once // 重试+冥等性
//atomic writes across partitions
1 从transaction coordinator 获取事务ID(事务信息和状态需要持久化到topic中 如果transaction coordinator挂了 用来做事务回复)
2 写message到leader partition中(如果是事务消息 通过消息头部的messageType来判断是事务消息还是普通消息)3
3 更新transaction coordinator中的事务状态(commit or abort)
4 transaction coordinator写Marker(事务的元数据 commit or abort)到leader topic中 更新LSO(Last Stable Offset LSO之后的offset对consumer不可见)
5 如果事务abort了 leader partition 把失败的元数据写到abort transaction的信息文件中
6 consumer消费时 如果message是事务消息 而且在abort transaction中 丢弃这个消息
7 如果在write Marker过程中 某个broker挂了就重写 write-Marker是at least once消息 有重复也无所谓
文件存储
segment file 组成:有两大部分组成 分别为index file 和data file, 此两个文件一一对应,后缀".index"和".log"分别表示segment的索引文件和数据文件。索引文件存储消息的元数据,log文件存储消息的内容。
segment的命名规则:partion全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset,数值最大为64位,19位数字字符长度,没有用0填充
以100.indx和100.log的文件为例
100.index 100.log
{1, 0} {(message101, 0), (message102, 239)}
{3, 500} {(message103, 500), (message104, 589), (message105, 666), (message106, 700)}
{7, 739} {(message107, 739)}
... ...
{N, postion} message100+N postion
{消息在本文件的系列号, 消息在log的物理偏移地址} {消息的全局系列号 物理偏移地址}
如何查找offset为107的message 根据文件名查找到对应的100的segment file。依次定位到100.index的元数据物理位置和100.log的物理偏移地址
Isr
Kafka结合同步复制和异步复制,使用ISR(与Partition Leader保持同步的Replica列表)的方式在确保数据不丢失和吞吐率之间做了平衡。Producer只需把消息发送到Partition Leader,Leader将消息写入本地Log。Follower则从Leader pull数据。Follower在收到该消息向Leader发送ACK。一旦Leader收到了ISR中所有Replica的ACK,该消息就被认为已经commit了,Leader将增加HW并且向Producer发送ACK。这样如果leader挂了,只要Isr中有一个replica存活,就不会丢数据。
Isr动态更新
Leader会跟踪ISR,如果ISR中一个Follower宕机,或者落后太多,Leader将把它从ISR中移除。这里所描述的“落后太多”指Follower复制的消息落后于Leader后的条数超过预定值(replica.lag.max.messages)或者Follower超过一定时间(replica.lag.time.max.ms)未向Leader发送fetch请求。ISR是AR中的一个子集,由leader维护ISR列表。Kafka的ISR的管理最终都会反馈到Zookeeper节点上。具体位置为:/brokers/topics/[topic]/partitions/[partition]/state。目前有两个地方会对这个Zookeeper的节点进行维护。
Controller来维护:Kafka集群中的其中一个Broker会被选举为Controller,主要负责Partition管理和副本状态管理,也会执行类似于重分配partition之类的管理任务。在符合某些特定条件下,Controller下的LeaderSelector会选举新的leader,ISR和新的leader_epoch及controller_epoch写入Zookeeper的相关节点中。同时发起LeaderAndIsrRequest通知所有的replicas。
Leader来维护:leader有单独的线程定期检测ISR中follower是否脱离ISR, 如果发现ISR变化,则会将新的ISR的信息返回到Zookeeper的相关节点中。
选举机制
kafka中所有topic的leader选举都有controller负责。在所有的broker中选择一个作为controller,controller通过rpc的方式叫leader的变更通知broker。 controller的选举则依赖zookeeper,每个broker启动的时候都尝试去zookeeper上创建一个临时节点,只有创建成功的broker才是controller,其他的broker则watch改节点,防止controller宕机后,执行重新选举新的controller。
controller_epoch:此值为一个数字,kafka集群中第一个broker第一次启动时为1,以后只要集群中center controller中央控制器所在broker变更或挂掉,就会重新选举新的center controller,每次center controller变更controller_epoch值就会 + 1