背景
上一篇日志复制我们分析了consul leader 接受一个key value的put请求,leader经过一顿操作,把日志都发给了follower,但是还没有提交,插入的go routine 也还wait在哪里等是否成功commit了,即应用到状态机,那我们这篇文章就来说一说consul 是怎么判断过半日志复制成功的和怎么应用到状态机的。
过半提交
raft协议要求写操作,只有超过一半才能算成功,才能应用到状态机FSM, 客户端才能读到这个数据,这个过半是leader自己也算在里面的,也就是前面一篇文章我们提到的,leader在持久化log后,就标记自己写成功了,我们没有分析,现在我们来分析下这个逻辑,因为follower 处理完日志复制后,也是有这个逻辑处理的。
//这里很重要,好就才看明白,这个是log 复制成功后,最终应用到状态机的一个机制
//这里是记录下leader自己的结果,因为过半leader也算一份。
r.leaderState.commitment.match(r.localID, lastIndex)
我们上篇文章只是在这里做了一个注释,并没有分析里面怎么实现的,我们就是要搞懂到底怎么实现的,下面是match的代码:
// Match is called once a server completes writing entries to disk: either the
// leader has written the new entry or a follower has replied to an
// AppendEntries RPC. The given server's disk agrees with this server's log up
// through the given index.
func (c *commitment) match(server ServerID, matchIndex uint64) {
c.Lock()
defer c.Unlock()
if prev, hasVote := c.matchIndexes[server]; hasVote && matchIndex > prev {
c.matchIndexes[server] = matchIndex
c.recalculate()
}
}
注释也基本说明了这个方法的作用,就是我们上面说的,我们就不再重复了,要理解这个逻辑,先了解下这个数据结构matchIndexes,matchIndexes 是一个map,key就是server id,就是consul 集群每个节点有一个id,value就是上次应用log到状态机的编号commitIndex,我们举一个例子来说明下recalculate的逻辑:
假如集群三个节点,server id分别为1,2,3,上次写log的编号是3,就是leader和follower都成功了,这个matchIndexes的数据如下:
1(leader) --> 3
2(follower) --> 3
3(follower) --> 3
假如这个时候新来一个put请求,leader本地持久化成功,就要更新这个数据结构了matchIndexes了, 因为leader是先更新,再并发请求follower的,所以这个时候matchIndexes数据如下,因为一个log,所以logIndex是加1。
1(leader) --> 4
2(follower) --> 3
3(follower) --> 3
因为leader本地完成和follower远程完成一样,都要通过这个逻辑来判断是否commit 该log 请求,即是否应用到FSM,所以就是要判断是否过半完成了,逻辑是这样的:
先创建一个数组matched,长度为集群节点数,我们的例子是3,
然后把matchIndexes的commitIndex 起出来,放到matched中,matched的数据就是[4,3,3]
排序,为啥要排序,因为map 是无序的,下面要通过中间索引的值来判断是否变化。
然后计算 quorumMatchIndex := matched[(len(matched)-1)/2],这个就是取中间索引下标的值,也是因为这点,需要第三步排序.
比较quorumMatchIndex 是否大于当前的commitIndex,如果大于,说明满足过半的条件,则更新,然后应用到状态机。
通过上面5步,来实现了一个过半的逻辑,我们再以两个场景来理下,
假如一个follower失败了,一个成功,成功的follower会更新matched的数据是[4,3,4],或者是[4,4,3],排序后为都是[4,4,3], 第4步计算的结果是4大于3,就可以提交了,经过上面的详细,再看下面的代码就好理解了:
// Internal helper to calculate new commitIndex from matchIndexes.
// Must be called with lock held.
func (c *commitment) recalculate() {
if len(c.matchIndexes) == 0 {
return
}
matched := make([]uint64, 0, len(c.matchIndexes))
for _, idx := range c.matchIndexes {
matched = append(matched, idx)
}
//这个排序是降序,才能保证下面取中间索引位置的值来判断是否过半已经复制成功。
sort.Sort(uint64Slice(matched))
quorumMatchIndex := matched[(len(matched)-1)/2]
//如果超过一半的follower成功了,则开始commit,即应用到状态机
if quorumMatchIndex > c.commitIndex && quorumMatchIndex >= c.startIndex {
c.commitIndex = quorumMatchIndex
//符合条件,触发commit,通知leader执行apply log
asyncNotifyCh(c.commitCh)
}
}
失败的处理情况
网络失败:
则本次写就失败了,就失败了,就返回了,这里也没有看到怎么通知前面的future.wait 退出的。
数据不一致:
如果是log和follower的有gap,比如follower停了一段时间,重新加入集群,这个时候follower的log 编号很多事和leader有差距的,对这种情况,就是日志一致性的保证。
follower会响应leader自己当前的logindex 编号,leader获取到对应的编号时,会更新发送logNext,也就是从这里开始发生日志给follower,就进入重试的的流程,重新发日志。
这里consul 根据raft协议做了一个优化,raft协议描述的是每次递减一个logindex 编号,来回确认,直到找到follower匹配的编号,再开始发日志,这样性能就很差,所有基本上没有那个分布式系统是那样实现落地的。
Commit Log
只要超过一半的日志 复制成功,consul 就进入日志commit阶段,也就是将修改应用到状态机,通过recalculate 方法给leader监听的commitCh 发一个消息,通知leader开始执行apply log 到FSM, leader 的代码如下:
case <-r.leaderState.commitCh:
// Process the newly committed entries
//上次执行commit log index
oldCommitIndex := r.getCommitIndex()
//新的log需要commit的log index,在判断是过半时,会更新commitindex
commitIndex := r.leaderState.commitment.getCommitIndex()
r.setCommitIndex(commitIndex)
....
start := time.Now()
var groupReady []*list.Element
var groupFutures = make(map[uint64]*logFuture)
var lastIdxInGroup uint64
// Pull all inflight logs that are committed off the queue.
for e := r.leaderState.inflight.Front(); e != nil; e = e.Next() {
commitLog := e.Value.(*logFuture)
idx := commitLog.log.Index
//idx 大于commitIndex,说明是后面新写入的,还没有同步到follower的日志。
if idx > commitIndex {
// Don't go past the committed index
break
}
// Measure the commit time
metrics.MeasureSince([]string{"raft", "commitTime"}, commitLog.dispatch)
groupReady = append(groupReady, e)
groupFutures[idx] = commitLog
lastIdxInGroup = idx
}
// Process the group
if len(groupReady) != 0 {
//应用的逻辑在这里。groupFutures 就是写入go routine wait的future
r.processLogs(lastIdxInGroup, groupFutures)
//清理inflight集合中已经commit过的log,防止重复commit
for _, e := range groupReady {
r.leaderState.inflight.Remove(e)
}
}
这里比较简单,就是从leaderState.inflight 中取出log,就是我们之前写入的,循环判断,如果log的编号大于commitIndex,说明是后面新写入的log,还没有同步到follower的log,不能提交。这里应该是有序的,lastIdxInGroup 应该就是需要commit的log的最大的一个编号。
processLogs的逻辑就是支持分批提交支持,发给consul 的runFSM的go routine,consul raft专门有一个go routine来负责commit log到状态机,支持批量和一个一个commit,我们看下单个commit的情况,代码如下:
commitSingle := func(req *commitTuple) {
// Apply the log if a command or config change
var resp interface{}
// Make sure we send a response
defer func() {
// Invoke the future if given
if req.future != nil {
req.future.response = resp
req.future.respond(nil)
}
}()
switch req.log.Type {
case LogCommand:
start := time.Now()
//将日志应用到FSM的关键在这里。
resp = r.fsm.Apply(req.log)
metrics.MeasureSince([]string{"raft", "fsm", "apply"}, start)
....
}
// Update the indexes
lastIndex = req.log.Index
lastTerm = req.log.Term
}
主要就是三点,应用log 到fsm,然后跟新下fsm的logindex和任期,最后就是要通知还在wait的go routine。
log有序
这里还补充下上篇日志复制一个重要点没有说明,就是最后发起日志复制的时候,因为raft要严格保证log的顺序性,所以发送日志除了当前新产生的log 的编号和任期,还需要带上该log的上一条log的编号和任期,follower需要检查这个,因为follower只要保证上一条也和leader的上一条匹配,即log 编号和任期term都相同,才可以,如果有一个不相同,就是follower和leader出现了gap,或者中间有其他leader写了数据,需要告诉leader直接覆盖自己本地log。
总结
到这里基本上把consul log提交的整个流程摸了一遍。我们大致盘清楚了consul raft log 从log 复制到log commit的整个过程,这中间还有一种情况没有理清楚,就是两个follower都挂了的情况,前面的写go routine 的wait的future是怎么通知到的,后面有时间再单独开一篇来说明。
目前正在看机会中,关注基础架构,中间件,架构师,技术经理等相关的机会。