首先,其实raft如果你不去看理论正确性的证明,光实现的话,只要按照raft里面给出的原则写代码就ok!如果代码写出来不正确,只能是你自己实现的问题。囧
最小规则
package raft
// 所有server的原则 Rules for Servers
// 1. 如果commitIndex > lastApplied:则递增lastApplied,应用 log[lastApplied] 到状态机之中
// 2. 如果Rpc请求或回复包括纪元T > currentTerm: 设置currentTerm = T,转换成 follower, 并且设置 votedFor=-1,表示未投票
// rules for Followers
// 回复 candidates与leaders的RPC请求
// 如果选举超时时间达到,并且没有收到来自当前leader或者要求投票的候选者的 AppendEnties RPC调 :转换角色为candidate
// rules for Candidates
// 转换成candidate时,开始一个选举:
// 1. 递增currentTerm;投票给自己;
// 2. 重置election timer;
// 3. 向所有的服务器发送 RequestVote RPC请求
// 如果获取服务器中多数投票:转换成Leader
// 如果收到从新Leader发送的AppendEnties RPC请求:转换成follower
// 如果选举超时时间达到:开始一次新的选举
// rules for Leaders
// 给每个服务器发送初始空的AppendEntires RPCs(heartbeat);指定空闲时间之 后重复该操作以防 election timeouts
// 如果收到来自客户端的命令:将条目插入到本地日志,在条目应用到状态机后回复给客户端
// 如果last log index >= nextIndex for a follower:发送包含开始于nextIndex的日志条目的AppendEnties RPC
// 如果成功:为follower更新nextIndex与matchIndex
// 如果失败是由于日志不一致:递减nextIndex然后重试
// 如果存在以个N满足 N>commitIndex,多数的matchIndex[i] >= N,并且 log[N].term == currentTerm:设置commitIndex = N
// AppendEntries RPC的实现:在回复给RPCs之前需要更新到持久化存储之上
// 有3类用途
// 1. candidate赢得选举的后,宣誓主权
// 2. 保持心跳
// 3. 让follower的日志和自己保持一致
// 接收者的处理逻辑:
// 1. 如果term < currentTerm 则返回false
// 2. 如果日志不包含一个在preLogIndex位置纪元为prevLogTerm的条目,则返回 false
// 该规则是需要保证follower已经包含了leader在PrevLogIndex之前所有的日志了
// 3. 如果一个已存在的条目与新条目冲突(同样的索引但是不同的纪元),则删除现存的该条目与其后的所有条
// 4. 将不在log中的新条目添加到日志之中
// 5. 如果leaderCommit > commitIndex,那么设置 commitIndex = min(leaderCommit,index of last new entry)
// RequestVote RPC 的实现: 由候选者发起用于收集选票
// 1. 如果term < currentTerm 则返回false
// 2. 如果本地的voteFor为空或者为candidateId,
// 并且候选者的日志至少与接受者的日志一样新,则投给其选票
// 怎么定义日志新
// 比较两份日志中最后一条日志条目的索引值和任期号定义谁的日志比较新
// 如果两份日志最后的条目的任期号不同,那么任期号大的日志更加新
// 如果两份日志最后的条目任期号相同,那么日志比较长的那个就更加新。
// 以上所有的规则保证的下面的几个点:
// 1. Election Safety 在一个特定的纪元中最多只有一个Leader会被选举出来
// 2. Leader Append-Only Leader不会在他的日志中覆盖或删除条 ,他只执行添加新的条
// 3. Log Matching:如果两个日志包含了同样index和term的条 ,那么在该index之前的所有条目都是相同的
// 4. Leader Completeness:如果在一个特定的term上提交了一个日志条目,那么该条目将显示在编号较大的纪元的Leader的日志里
// 5. State Machine Safety:如果一个服务器在一个给定的index下应用一个日志条目到他的状态机上,没有其他服务器会在相同index上应用不同的日志条目
以上就是全部,我们接下去挨个来看看这些规则在代码中是怎么体现的
follower
规则有两条
- 回复 candidates与leaders的RPC请求
- 如果选举超时时间达到,并且没有收到来自当前leader或者要求投票的候选者的 AppendEnties RPC调 :转换角色为candidate
对于第一条follower处理来自candidate的RequestVote RPC
和来自leader的AppendEntries RPC
两种请求
对于第二条则是说follower在规定的选举时间内没有收到来自leader的心跳,则认为leader已经不存在了,自己开始竞选leader,因此此处选举的超时时间要大于心跳时间
看下代码实现
candidate
候选者的规则有
- 转换成candidate时,开始一个选举
- 递增currentTerm;投票给自己;
- 重置election timer;
- 向所有的服务器发送 RequestVote RPC请求
- 如果获取服务器中多数投票:转换成Leader
- 如果收到从新Leader发送的AppendEnties RPC请求:转换成follower
- 如果选举超时时间达到:开始一次新的选举
根据这些规则,代码即:
其中广播后的结果是通过
voteResultChan
来传递的,而心跳则是通过heartbeatChan
来传递,在这基础上,需要有个注意的地方,candidate在具体开始这些任务的时候,需要去读voteResultChan
和heartbeatChan
,因为可能里面已经有通知了,对于voteResultChan
的需要忽略它,而对于heartbeatChan
则说明已经有leader产生了leader规则
- 给每个服务器发送初始空的AppendEntires RPCs(heartbeat);指定空闲时间之后重复该操作以防 election timeouts
- 如果收到来自客户端的命令:将条目插入到本地日志,在条目应用到状态机后回复给客户端
- 如果last log index >= nextIndex for a follower:发送包含开始于nextIndex的日志条目的AppendEnties RPC
- 如果成功:为follower更新nextIndex与matchIndex
- 如果失败是由于日志不一致:递减nextIndex然后重试
- 如果存在以个N满足 N>commitIndex,多数的matchIndex[i] >= N,并且 log[N].term == currentTerm:设置commitIndex = N
其中最重要的一点是6,这个第六点主要是解决如下问题:
- (a) S1 是领导者,部分的复制了索引位置 2 的日志条目
- (b) S1 崩溃了,然后 S5 在任期 3 里通过 S3、S4 和自己的选票赢得选举,然后从客户端接收了一条不一样的日志条目放在了索引2 处
- (c) S5 又崩溃了;S1 重新启动,选举成功,开始复制日志。在这时,来自任期 2 的那条日志已经被复制到了集群中的大多数机器上,但是还没有被提交
- (d) S1 又崩溃了,S5 可以重新被选举成功(通过来自 S2,S3 和 S4 的选票),然后覆盖了他们在索引 2 处的日志。但是,在崩溃之前,如果 S1 在自己的任期里复制了日志条目到大多数机器上
- (e) 然后这个条目就会被提交(S5 就不可能选举成功)。 在这个时候,之前的所有日志就会被正常提交处理
该问题是因为:当一个新Leader当选时,由于所有成员的日志进度不同,很可能需要继续复制前面纪元的日志条目,因为即使为前面纪元的日志复制到多数服务器并且提交,如步骤C,但是在D中还是可能被覆盖,这就产生了不一致。解决的方法就是通过规则6:如果存在以个N满足 N>commitIndex,多数的matchIndex[i] >= N,并且 log[N].term == currentTerm:设置commitIndex = N,具体就是在c阶段,S1成为Leader,此时的纪元是4。S1一样向其他服务器发送日志2,当发送到多数服务器S1,S2,S3时,此时并不提交该日志,而是继续复制日志4,直到日志4到达多数服务器后,提交日志4,即leader只会提交当前纪元的日志。如果提交了4之后宕机,S5就不会被选举为新的 Leader,如果在提交4之前宕机,那么日志2,日志4还是可能被覆盖,但是由于没有提交,也就没有执行日志中的命令,即使被覆盖也无关系。
代码上就是:
不间断的发送心跳
RequestVote RPC
投票rpc的规则
// 1. 如果term < currentTerm 则返回false
// 2. 如果本地的voteFor为空或者为candidateId,
// 并且候选者的日志至少与接受者的日志一样新,则投给其选票
// 怎么定义日志新
// 比较两份日志中最后一条日志条目的索引值和任期号定义谁的日志比较新
// 如果两份日志最后的条目的任期号不同,那么任期号大的日志更加新
// 如果两份日志最后的条目任期号相同,那么日志比较长的那个就更加新。
需要注意的是日志新的处理,实现
其中红色是通用规则
如果Rpc请求或回复包括纪元T > currentTerm: 设置currentTerm = T,转换成 follower, 并且设置 votedFor=-1,表示未投票
这个通用规则的保证了有新纪元开始的时候,所有server都转变为follower,开始新一轮选举
AppendEntries RPC
有3类用途
- candidate赢得选举的后,宣誓主权
- 保持心跳
- 让follower的日志和自己保持一致
接收者的处理逻辑:
- 如果term < currentTerm 则返回false
- 如果日志不包含一个在preLogIndex位置纪元为prevLogTerm的条目,则返回 false,该规则是需要保证follower已经包含了leader在PrevLogIndex之前所有的日志了
- 如果一个已存在的条目与新条目冲突(同样的索引但是不同的纪元),则删除现存的该条目与其后的所有条
- 将不在log中的新条目添加到日志之中
- 如果leaderCommit > commitIndex,那么设置 commitIndex =min(leaderCommit,index of last new entry)
在实现上也是完全按照上面的规则
func (rf *Raft) AppendEnties(args AppendEntiesArgs, reply *AppendEntiesReply) {
if args.Term < rf.currentTerm {
reply.Term = rf.currentTerm
reply.Success = false
return
}
// 本身自己是leader,有可能收到别人的请求吗,实验中是可能的,会收到的一个大的term
//if rf.status == STATUS_LEADER {
// log.Println("I am leader, but get AppendEnties",rf.Detail())
// log.Println("args.term",args.Term,"currentTerm",rf.currentTerm)
//}
// 心跳一定来自leader
rf.heartbeatChan <- true
rf.mu.Lock()
defer rf.mu.Unlock()
rf.rpcRuleForAllServer(args.Term)
// 如果日志在 prevLogIndex 位置处的日志条目的任期号和 prevLogTerm 不匹配,则返回 false
// len(rf.log) >= args.PrevLogIndex + 1,说明本地日志长度 >= leader日志长度
if len(rf.log) >= args.PrevLogIndex + 1 && rf.log[args.PrevLogIndex].Term == args.PrevLogTerm {
// 该规则是需要保证follower已经包含了leader在PrevLogIndex之前所有的日志了
for i:=0;i<len(args.Entries);i++ {
if args.PrevLogIndex+1+i < len(rf.log){
if rf.log[args.PrevLogIndex+1+i] != args.Entries[i]{
// index相同,但是纪元不同
rf.log = rf.log[:args.PrevLogIndex+1+i]// 之前的还是相同的,再加上本条之后的
rf.log = append(rf.log,args.Entries[i:]...)
break
}
}else {
// 本条目不存在
rf.log = append(rf.log,args.Entries[i:]...)
break
}
}
if args.LeaderCommit > rf.commitIndex {
rf.commitIndex = min(args.LeaderCommit,len(rf.log)-1)
}
// !!为了查明为什么日志错了
rf.checkLog(args)
//在回复给RPCs之前需要更新到持久化存储之上
rf.persist()
reply.Term = rf.currentTerm
reply.Success = true
reply.NextIndex = len(rf.log)
return
}else {
reply.Term = rf.currentTerm
reply.Success = false
reply.NextIndex = min(rf.commitIndex+1,args.PrevLogIndex-1)//
return
}
}
以上就是所有规则和代码对应的实现了,github地址