第一步 阅读论文6之后的部分
https://www.infoq.cn/article/raft-paper
随后开始看2C文档
任务目标
If a Raft-based server reboots it should resume service where it left off. This requires that Raft keep persistent state that survives a reboot. The paper's Figure 2 mentions which state should be persistent, and raft.go contains examples of how to save and restore persistent state.
A “real” implementation would do this by writing Raft's persistent state to disk each time it changes, and reading the latest saved state from disk when restarting after a reboot. Your implementation won't use the disk; instead, it will save and restore persistent state from a Persister object (see persister.go). Whoever calls Raft.Make() supplies a Persister that initially holds Raft's most recently persisted state (if any). Raft should initialize its state from that Persister, and should use it to save its persistent state each time the state changes. Use the Persister's ReadRaftState() and SaveRaftState() methods.
第二步 实现persist(),readPersist()
Complete the functions persist() and readPersist() in raft.go by adding code to save and restore persistent state. You will need to encode (or "serialize") the state as an array of bytes in order to pass it to the Persister. Use the labgob encoder we provide to do this; see the comments in persist() and readPersist(). labgob is derived from Go's gob encoder; the only difference is that labgob prints error messages if you try to encode structures with lower-case field names.
因为例子都在注释里给你了,我把注释解掉,改一改写起来十分简单。只要知道什么是要持久存储的即可。
第三步 思考何时调用这2个函数
You now need to determine at what points in the Raft protocol your servers are required to persist their state, and insert calls to persist() in those places. There is already a call to readPersist() in Raft.Make(). Once you've done this, you should pass the remaining tests. You may want to first try to pass the "basic persistence" test (go test -run 'TestPersist12C'), and then tackle the remaining ones (go test -run 2C).
基本思路就是修改了这3个值的时候,就需要做PERSIST,最后来思考怎么加锁的问题。
我们搜索rf.currentTerm
, rf.voteFor
, rf.log
搜完之后,我加了如下几处。
因为readPersist,只有当机器挂了重启之后才需要,所以在MAKE里做了就够了。不用添加额外的地方。
我们可以发现调用persist 都是在锁保护范围内的,所以这个方法不用加锁。
但是readPersist需要加锁
第四步 FROM HINT
In order to pass some of the challenging tests towards the end, such as those marked "unreliable", you will need to implement the optimization to allow a follower to back up the leader's nextIndex by more than one entry at a time. See the description in the extended Raft paper starting at the bottom of page 7 and top of page 8 (marked by a gray line). The paper is vague about the details; you will need to fill in the gaps, perhaps with the help of the 6.824 Raft lectures.
文章内容:
If desired, the protocol can be optimized to reduce the number of rejected AppendEntries RPCs. For example, when rejecting an AppendEntries request, the follower can include the term of the conflicting entry and the first index it stores for that term. With this information, the leader can decrement nextIndex to bypass all of the conflicting entries in that term; one AppendEntries RPC will be required for each term with conflicting entries, rather than one RPC per entry.
例如,当拒绝了一个 AppendEntries 请求,追随者可以记录下冲突日志条目的任期号和自己存储那个任期的最早的索引。通过这些信息,领导人能够直接递减nextIndex跨过那个任期内所有的冲突条目;这样的话,一个冲突的任期需要一次 AppendEntries RPC,而不是每一个冲突条目需要一次 AppendEntries RPC。
We believe the protocol the authors probably want you to follow is:
- If a follower does not have prevLogIndex in its log, it should return with conflictIndex = len(log) and conflictTerm = None.
- If a follower does have prevLogIndex in its log, but the term does not match, it should return conflictTerm = log[prevLogIndex].Term, and then search its log for the first index whose entry has term equal to conflictTerm.
- Upon receiving a conflict response, the leader should first search its log for conflictTerm. If it finds an entry in its log with that term, it should set nextIndex to be the one beyond the index of the last entry in that term in its log.
- If it does not find an entry with that term, it should set nextIndex = conflictIndex.
根据上述内容,首先要在REPLY里加一个conflictIndex 和 conflictTerm
因为只是REPLY.SUCCESS = FALSE 的时候 需要设置conflictIndex
和 confictTerm
所以只要关注AppendEntries
handler 的2个RETURN, 结合上文的意思,修改代码如下:
PASS 2C
Test race
zyx@zyx-virtual-machine:~/Desktop/mit6824/6.824/src/raft$ go test -race
Test (2A): initial election ...
... Passed -- 3.1 3 38 0
Test (2A): election after network failure ...
... Passed -- 4.5 3 88 0
Test (2B): basic agreement ...
... Passed -- 1.3 5 32 3
Test (2B): agreement despite follower disconnection ...
... Passed -- 6.6 3 97 8
Test (2B): no agreement if too many followers disconnect ...
... Passed -- 3.9 5 148 3
Test (2B): concurrent Start()s ...
... Passed -- 0.8 3 10 6
Test (2B): rejoin of partitioned leader ...
... Passed -- 4.9 3 109 4
Test (2B): leader backs up quickly over incorrect follower logs ...
... Passed -- 37.8 5 2198 102
Test (2B): RPC counts aren't too high ...
... Passed -- 2.3 3 28 12
Test (2C): basic persistence ...
... Passed -- 5.1 3 224 6
Test (2C): more persistence ...
... Passed -- 18.7 5 2182 16
Test (2C): partitioned leader and one follower crash, leader restarts ...
... Passed -- 2.5 3 51 4
Test (2C): Figure 8 ...
... Passed -- 34.7 5 26854 37
Test (2C): unreliable agreement ...
... Passed -- 12.4 5 326 246
Test (2C): Figure 8 (unreliable) ...
... Passed -- 44.2 5 2680 461
Test (2C): churn ...
... Passed -- 16.7 5 833 254
Test (2C): unreliable churn ...
... Passed -- 19.1 5 1403 174
PASS
ok raft 219.623s
但是我们看到Test (2C): Figure 8 ...
... Passed -- 34.7 5 26854 37
的RPC发送条目 非常大,有时候TEST -RACE 会报错, 说超过race mode 下8192个goroutine
阅读了一下TestFigure82C 的代码
发现会在选出LEADER 后 做一个CRASH 操作
看下CRASH的实现
发现这里会调用KILL,但我并没有实现这个
实现KILL
所谓KILL,就是让一个NODE,发现了之后,就直接结束掉。
我还是用GO的CHANNEL来做。
重新跑一下测试
Test All with time
Test 100 times
BUG 1 TestFigure8Unreliable2C
测试多次的时候,发现 TestFigure8Unreliable2C 这个CASE 有1/10的概率会FAIL,看下FAIL信息
Test (2C): Figure 8 (unreliable) ...
2019/05/01 09:49:14 apply error: commit index=235 server=0 7998 != server=2 4299
exit status 1
FAIL raft 28.516s
2个INDEX 不一致。似乎是2个NODE 上,同一个INDEX的LOG 不一致了。而COMMIT的应该要一致。
没有头绪时,还是先过一遍 STUDENT GUIDE
https://thesquareplanet.com/blog/students-guide-to-raft/
确保里面提到的每一个点 你都完全理解,而且在代码里有所行动。
我发现了一个问题,在拿到REPLY 后他要用CURRENT TERM 和 ARGS.TERM 去做比较。也就是说2个SEND RPC的地方,在拿到REPLY之后,还要做个CHECK ,如果CURRENT TERM 和 ARGS.TERM 不一致,就要直接RETURN,而忽略这个REPLY
在2个SENDER函数里 添加代码如下
测试脚本
#!/bin/bash
export GOPATH="/home/zyx/Desktop/mit6824/6.824"
export PATH="$PATH:/usr/lib/go-1.9/bin"
rm res -rf
mkdir res
for ((i = 0; i < 10; i++))
do
for ((c = $((i*30)); c < $(( (i+1)*30)); c++))
do #replace job name here
(go test -run TestFigure8Unreliable2C ) &> ./res/$c &
done
sleep 40
grep -nr "FAIL.*raft.*" res
done
修改前测试结果
修改后测试结果
BUG 2
Test (2B): no agreement if too many followers disconnect ...
--- FAIL: TestFailNoAgree2B (2.35s)
config.go:465: one(10) failed to reach agreement
这个BUG 出现概率非常低,有时测1000次会出现10次左右。有时测3000次都不出现。
在我打了很多LOG,花了很多时间测了很多遍,找到原因。
测试脚本
#!/bin/bash
export GOPATH="/home/zyx/Desktop/mit6824/6.824"
export PATH="$PATH:/usr/lib/go-1.9/bin"
rm res -rf
mkdir res
set int j = 0
for ((i = 0; j < 25; i++))
do
if (($((i % 10)) == 0)); then
rm res -rf
mkdir res
echo $i
fi
for ((c = $((i*30)); c < $(( (i+1)*30)); c++))
do
(go test -run TestFailNoAgree2B) &> ./res/$c &
done
sleep 7
if grep -nr "FAIL.*raft.*" res; then
exit 1
fi
done
输出
2019/05/07 21:37:23.027238 SEND IN MSG (1), applyMsg:10
2019/05/07 21:37:23.389237 SEND IN MSG (2), applyMsg:10
2019/05/07 21:37:23.389254 SEND IN MSG (3), applyMsg:10
2019/05/07 21:37:23.389261 SEND IN MSG (4), applyMsg:10
2019/05/07 21:37:23.389472 0 at 1 start election, last index 0 last term 0 last entry {0 <nil>}
2019/05/07 21:37:23.406102 4 be follow so curterm++ now is 1
2019/05/07 21:37:23.406188 granted Vote (4)
2019/05/07 21:37:23.406633 1 be follow so curterm++ now is 1
2019/05/07 21:37:23.406695 granted Vote (1)
2019/05/07 21:37:23.406874 CANDIDATE: 0 receive enough vote and becoming a new leader
2019/05/07 21:37:23.425553 3 be follow so curterm++ now is 1
2019/05/07 21:37:23.425675 2 be follow so curterm++ now is 1
2019/05/07 21:37:23.425745 granted Vote (2)
2019/05/07 21:37:23.429599 3 at 2 start election, last index 0 last term 0 last entry {0 <nil>}
2019/05/07 21:37:23.429635 granted Vote (3)
2019/05/07 21:37:23.429815 1 be follow so curterm++ now is 2
2019/05/07 21:37:23.429839 granted Vote (1)
2019/05/07 21:37:23.430065 2 be follow so curterm++ now is 2
2019/05/07 21:37:23.430098 granted Vote (2)
2019/05/07 21:37:23.430163 CANDIDATE: 3 receive enough vote and becoming a new leader
2019/05/07 21:37:23.430172 be Ledaer (3)
2019/05/07 21:37:23.430177 send Hearbeat (3)
2019/05/07 21:37:23.430194 send Hearbeat from 3 to (4)
2019/05/07 21:37:23.430368 4 be follow so curterm++ now is 2
2019/05/07 21:37:23.430386 receive Hearbeat (4)
2019/05/07 21:37:23.430489 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.430527 send Hearbeat from 3 to (4)
2019/05/07 21:37:23.430612 receive Hearbeat (4)
2019/05/07 21:37:23.430672 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.430684 send Hearbeat from 3 to (0)
2019/05/07 21:37:23.430786 send Hearbeat from 3 to (1)
2019/05/07 21:37:23.430891 receive Hearbeat (1)
2019/05/07 21:37:23.430945 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.430954 send Hearbeat from 3 to (2)
2019/05/07 21:37:23.431046 receive Hearbeat (2)
2019/05/07 21:37:23.431129 reply.Success (3=>2), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.431143 send Hearbeat from 3 to (0)
2019/05/07 21:37:23.431277 send Hearbeat from 3 to (1)
2019/05/07 21:37:23.431312 granted Vote (4)
2019/05/07 21:37:23.431444 receive Hearbeat (1)
2019/05/07 21:37:23.431459 send Hearbeat from 3 to (2)
2019/05/07 21:37:23.431566 receive Hearbeat (2)
2019/05/07 21:37:23.431620 reply.Success (3=>2), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.431695 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.433226 be Ledaer (0)
2019/05/07 21:37:23.433240 SEND IN MSG (0), applyMsg:10
2019/05/07 21:37:23.433277 SEND IN MSG SUCCESS (0), applyMsg:10
2019/05/07 21:37:23.433321 send Hearbeat from 0 to (4)
2019/05/07 21:37:23.433504 send Hearbeat from 0 to (1)
2019/05/07 21:37:23.433696 send Hearbeat from 0 to (2)
2019/05/07 21:37:23.433839 send Hearbeat from 0 to (3)
2019/05/07 21:37:23.433975 0 be follow so curterm++ now is 2
2019/05/07 21:37:23.434064 receive Hearbeat (0)
2019/05/07 21:37:23.434121 reply.Success (3=>0), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.530457 send Hearbeat (3)
2019/05/07 21:37:23.530501 send Hearbeat from 3 to (4)
2019/05/07 21:37:23.530628 send Hearbeat from 3 to (0)
2019/05/07 21:37:23.530647 send Hearbeat from 3 to (2)
2019/05/07 21:37:23.530689 send Hearbeat from 3 to (1)
2019/05/07 21:37:23.530822 receive Hearbeat (0)
2019/05/07 21:37:23.530831 receive Hearbeat (2)
2019/05/07 21:37:23.530822 receive Hearbeat (1)
2019/05/07 21:37:23.530911 reply.Success (3=>0), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.530924 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.530955 receive Hearbeat (4)
2019/05/07 21:37:23.530992 reply.Success (3=>2), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.531030 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.630715 send Hearbeat (3)
2019/05/07 21:37:23.630745 send Hearbeat from 3 to (4)
2019/05/07 21:37:23.630775 send Hearbeat from 3 to (1)
2019/05/07 21:37:23.630827 send Hearbeat from 3 to (0)
2019/05/07 21:37:23.630869 send Hearbeat from 3 to (2)
2019/05/07 21:37:23.630914 receive Hearbeat (4)
2019/05/07 21:37:23.630928 receive Hearbeat (1)
2019/05/07 21:37:23.630961 receive Hearbeat (0)
2019/05/07 21:37:23.631001 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.631030 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.631040 receive Hearbeat (2)
2019/05/07 21:37:23.631052 reply.Success (3=>0), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.631118 reply.Success (3=>2), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.730978 send Hearbeat (3)
2019/05/07 21:37:23.731032 send Hearbeat from 3 to (4)
2019/05/07 21:37:23.731043 send Hearbeat from 3 to (2)
2019/05/07 21:37:23.731036 send Hearbeat from 3 to (1)
2019/05/07 21:37:23.731202 receive Hearbeat (4)
2019/05/07 21:37:23.731209 receive Hearbeat (1)
2019/05/07 21:37:23.731264 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.731274 send Hearbeat from 3 to (0)
2019/05/07 21:37:23.731391 receive Hearbeat (2)
2019/05/07 21:37:23.731498 reply.Success (3=>2), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.731544 receive Hearbeat (0)
2019/05/07 21:37:23.731801 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.731869 reply.Success (3=>0), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.831101 send Hearbeat (3)
2019/05/07 21:37:23.831131 send Hearbeat from 3 to (4)
2019/05/07 21:37:23.831294 receive Hearbeat (4)
2019/05/07 21:37:23.831356 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.831374 send Hearbeat from 3 to (0)
2019/05/07 21:37:23.831457 receive Hearbeat (0)
2019/05/07 21:37:23.831512 reply.Success (3=>0), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.831521 send Hearbeat from 3 to (1)
2019/05/07 21:37:23.831619 send Hearbeat from 3 to (2)
2019/05/07 21:37:23.831687 receive Hearbeat (1)
2019/05/07 21:37:23.831771 receive Hearbeat (2)
2019/05/07 21:37:23.831782 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.831841 reply.Success (3=>2), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.931542 send Hearbeat (3)
2019/05/07 21:37:23.931608 send Hearbeat from 3 to (4)
2019/05/07 21:37:23.931619 send Hearbeat from 3 to (1)
2019/05/07 21:37:23.931809 receive Hearbeat (1)
2019/05/07 21:37:23.931810 receive Hearbeat (4)
2019/05/07 21:37:23.931836 send Hearbeat from 3 to (0)
2019/05/07 21:37:23.931912 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.931957 send Hearbeat from 3 to (2)
2019/05/07 21:37:23.932056 receive Hearbeat (0)
2019/05/07 21:37:23.932139 receive Hearbeat (2)
2019/05/07 21:37:23.932170 reply.Success (3=>0), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.932301 reply.Success (3=>2), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.932326 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.031737 send Hearbeat (3)
2019/05/07 21:37:24.031778 send Hearbeat from 3 to (4)
2019/05/07 21:37:24.031788 send Hearbeat from 3 to (1)
2019/05/07 21:37:24.031847 send Hearbeat from 3 to (0)
2019/05/07 21:37:24.032029 receive Hearbeat (1)
2019/05/07 21:37:24.032059 receive Hearbeat (0)
2019/05/07 21:37:24.032084 send Hearbeat from 3 to (2)
2019/05/07 21:37:24.032204 reply.Success (3=>0), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.032324 receive Hearbeat (2)
2019/05/07 21:37:24.032363 receive Hearbeat (4)
2019/05/07 21:37:24.032427 reply.Success (3=>2), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.032525 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.032573 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.132327 send Hearbeat (3)
2019/05/07 21:37:24.132377 send Hearbeat from 3 to (4)
2019/05/07 21:37:24.132603 receive Hearbeat (4)
2019/05/07 21:37:24.132715 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.132734 send Hearbeat from 3 to (0)
2019/05/07 21:37:24.132878 receive Hearbeat (0)
2019/05/07 21:37:24.132981 reply.Success (3=>0), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.132999 send Hearbeat from 3 to (1)
2019/05/07 21:37:24.133252 receive Hearbeat (1)
2019/05/07 21:37:24.133277 send Hearbeat from 3 to (2)
2019/05/07 21:37:24.133379 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.133436 receive Hearbeat (2)
2019/05/07 21:37:24.133602 reply.Success (3=>2), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.232982 send Hearbeat (3)
2019/05/07 21:37:24.233076 send Hearbeat from 3 to (4)
2019/05/07 21:37:24.233182 send Hearbeat from 3 to (1)
2019/05/07 21:37:24.233387 send Hearbeat from 3 to (0)
2019/05/07 21:37:24.233423 send Hearbeat from 3 to (2)
2019/05/07 21:37:24.233534 receive Hearbeat (1)
2019/05/07 21:37:24.233651 receive Hearbeat (2)
2019/05/07 21:37:24.233680 receive Hearbeat (0)
2019/05/07 21:37:24.233699 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.233795 reply.Success (3=>0), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.233887 reply.Success (3=>2), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.233927 receive Hearbeat (4)
2019/05/07 21:37:24.234070 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.333358 send Hearbeat (3)
2019/05/07 21:37:24.333395 send Hearbeat from 3 to (4)
2019/05/07 21:37:24.333438 send Hearbeat from 3 to (2)
2019/05/07 21:37:24.333444 send Hearbeat from 3 to (1)
2019/05/07 21:37:24.333744 receive Hearbeat (1)
2019/05/07 21:37:24.333745 receive Hearbeat (2)
2019/05/07 21:37:24.333801 send Hearbeat from 3 to (0)
2019/05/07 21:37:24.333862 reply.Success (3=>2), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.334212 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.334231 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.334237 reply.Success (3=>0), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.334243 receive Hearbeat (4)
2019/05/07 21:37:24.334251 receive Hearbeat (0)
2019/05/07 21:37:24.433686 send Hearbeat (3)
2019/05/07 21:37:24.433717 send Hearbeat from 3 to (4)
2019/05/07 21:37:24.433911 receive Hearbeat (4)
2019/05/07 21:37:24.434052 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.434075 send Hearbeat from 3 to (0)
2019/05/07 21:37:24.434211 receive Hearbeat (0)
2019/05/07 21:37:24.434308 reply.Success (3=>0), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.434322 send Hearbeat from 3 to (1)
2019/05/07 21:37:24.434446 receive Hearbeat (1)
2019/05/07 21:37:24.434549 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.434569 send Hearbeat from 3 to (2)
2019/05/07 21:37:24.434694 receive Hearbeat (2)
2019/05/07 21:37:24.434787 reply.Success (3=>2), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.534242 send Hearbeat (3)
2019/05/07 21:37:24.534276 send Hearbeat from 3 to (4)
2019/05/07 21:37:24.534342 send Hearbeat from 3 to (1)
2019/05/07 21:37:24.534430 send Hearbeat from 3 to (2)
2019/05/07 21:37:24.534579 receive Hearbeat (4)
2019/05/07 21:37:24.534644 receive Hearbeat (1)
2019/05/07 21:37:24.534660 receive Hearbeat (2)
2019/05/07 21:37:24.534673 send Hearbeat from 3 to (0)
2019/05/07 21:37:24.534686 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.534793 reply.Success (3=>2), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.534873 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.534934 receive Hearbeat (0)
2019/05/07 21:37:24.535063 reply.Success (3=>0), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.634898 send Hearbeat (3)
2019/05/07 21:37:24.634944 send Hearbeat from 3 to (4)
2019/05/07 21:37:24.635082 send Hearbeat from 3 to (0)
2019/05/07 21:37:24.635106 send Hearbeat from 3 to (1)
2019/05/07 21:37:24.635230 receive Hearbeat (0)
2019/05/07 21:37:24.635231 receive Hearbeat (1)
2019/05/07 21:37:24.635248 send Hearbeat from 3 to (2)
2019/05/07 21:37:24.635309 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.635343 receive Hearbeat (2)
2019/05/07 21:37:24.635386 receive Hearbeat (4)
2019/05/07 21:37:24.635420 reply.Success (3=>2), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.635445 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.635475 reply.Success (3=>0), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.735184 send Hearbeat (3)
2019/05/07 21:37:24.735229 send Hearbeat from 3 to (4)
2019/05/07 21:37:24.735543 send Hearbeat from 3 to (1)
2019/05/07 21:37:24.735633 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.735650 send Hearbeat from 3 to (2)
2019/05/07 21:37:24.735670 receive Hearbeat (1)
2019/05/07 21:37:24.735767 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.735777 receive Hearbeat (2)
...上面无线循环直到超时
我来大概解释下这个问题是怎么造成的。这里一共有5个SERVER。
首先CANDIDATE 0率先拿到3票成为LEADER,其中不包括节点3的票。
节点3在处理投票的时候,发现CANDIDATE 0 的CURRENT TERM是1,他会先把自己变为FOLLOWER 把自己的TERM增加为1,随后开始去验证要不要给CANDIDATE 0的投票。
在验证期间,节点3的计时器也到了,开始发起一波投票。
随后计时器才收到节点3 GRANT VOTE了给CANDIDATE 0,计时器重置。此时节点3已经再发起投票了。
因为发起投票的时候,节点3的CURRENT TERM是1,++之后变为2,所以节点3是用TERM为2去索要投票。
自然也可以当选LEADER,此时在TESTER那已经拿到了0号节点是LEADER,同时向LEADER 0发送了MSG,并且LEADER0接受了MSG也认为自己是LEADER。因为此时LEADER 0并没有收到节点3的REQUEST VOTE,也没有收到节点3的HEARTBEAT。
随后在节点0 吃下了MSG,返回是LEADER之后,TESTER开始等待这个结果在所有节点达成一致了。
这个时候HEARTBEAT来了,节点0因为TERM小于节点3,所以变为了FOLLOWER,所以这个消息丢了。
之后也就没有这个MSG了。这个MSG丢了。
结论 和同事聊了下,因为MSG没有COMMIT,所以RAFT是可能会丢掉没有COMMIT的消息,所以应该是这个测试不应该觉得消息给过去,在没COMMIT前,就一定会达成一致。
BUG 3 肉眼发现
需要反向排序,也就是大的在前面,原来的代码是小的在前面了。因为测试都是奇数,所以没测出来
测试1000次 测试脚本
预计时间 12小时
#!/bin/bash
export GOPATH="/home/zyx/Desktop/mit6824/6.824"
export PATH="$PATH:/usr/lib/go-1.9/bin"
rm res -rf
mkdir res
for ((i = 0; i < 100; i++))
do
for ((c = $((i*6)); c < $(( (i+1)*6)); c++))
do
(go test -race) &> ./res/$c &
sleep 15
done
sleep 90
if grep -nr "WARNING.*" res; then
echo "WARNING: DATA RACE"
fi
if grep -nr "FAIL.*raft.*" res; then
echo "found fail"
fi
done
CONCISE 版 代码
最后根据论文整理了一版,CONCISE的代码,除去注释,核心代码400行,同时把论文的重点参差进代码注释中。
这版代码的LOCK方案有一个不足。我很小心的避开了自己创建的CHANNEL使用的时候,都UNLOCK 去用。
可是APPLY CH,我没有做到这一点。
这就要求上层的APPLICATION的写代码的时候,一定要单独开一个线程去听APPLY CH的MSG,且避免调用到RAFT SERVER的LOCK,已经包含RAFT SERVER LOCK的方法
这个问题我是在写LAB 3B的时候发现的。具体发现过程参见我的LAB 3B指南
package raft
import (
"bytes"
"labgob"
"log"
"math/rand"
"sort"
"sync"
"sync/atomic"
"time"
)
import "labrpc"
type ApplyMsg struct {
CommandValid bool
Command interface{}
CommandIndex int
}
type State int
const (
Follower State = iota // value --> 0
Candidate // value --> 1
Leader // value --> 2
)
const NULL int = -1
type Log struct {
Term int "term when entry was received by leader"
Command interface{} "command for state machine,"
}
// A Go object implementing a single Raft peer.
type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
// state a Raft server must maintain.
state State
//Persistent state on all servers:(Updated on stable storage before responding to RPCs)
currentTerm int "latest term server has seen (initialized to 0 increases monotonically)"
votedFor int "candidateId that received vote in current term (or null if none)"
log []Log "log entries;(first index is 1)"
//Volatile state on all servers:
commitIndex int "index of highest log entry known to be committed (initialized to 0, increases monotonically)"
lastApplied int "index of highest log entry applied to state machine (initialized to 0, increases monotonically)"
//Volatile state on leaders:(Reinitialized after election)
nextIndex []int "for each server,index of the next log entry to send to that server"
matchIndex []int "for each server,index of highest log entry known to be replicated on server(initialized to 0, im)"
//channel
applyCh chan ApplyMsg // from Make()
killCh chan bool //for Kill()
//handle rpc
voteCh chan bool
appendLogCh chan bool
}
// return currentTerm and whether this server believes it is the leader.
func (rf *Raft) GetState() (int, bool) {
var term int
var isleader bool
rf.mu.Lock()
defer rf.mu.Unlock()
term = rf.currentTerm
isleader = (rf.state == Leader)
return term, isleader
}
// save Raft's persistent state to stable storage,
// where it can later be retrieved after a crash and restart.
func (rf *Raft) persist() {
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(rf.currentTerm)
e.Encode(rf.votedFor)
e.Encode(rf.log)
data := w.Bytes()
rf.persister.SaveRaftState(data)
}
// restore previously persisted state.
func (rf *Raft) readPersist(data []byte) {
if data == nil || len(data) < 1 { // bootstrap without any state?
return
}
r := bytes.NewBuffer(data)
d := labgob.NewDecoder(r)
var currentTerm int
var voteFor int
var clog []Log
if d.Decode(¤tTerm) != nil || d.Decode(&voteFor) != nil || d.Decode(&clog) != nil {
log.Fatal("readPersist ERROR for server %v",rf.me)
} else {
rf.mu.Lock()
rf.currentTerm, rf.votedFor, rf.log = currentTerm, voteFor, clog
rf.mu.Unlock()
}
}
// RequestVote RPC arguments structure. field names must start with capital letters!
type RequestVoteArgs struct {
Term int "candidate’s term"
CandidateId int "candidate requesting vote"
LastLogIndex int "index of candidate’s last log entry (§5.4)"
LastLogTerm int "term of candidate’s last log entry (§5.4)"
}
// RequestVote RPC reply structure. field names must start with capital letters!
type RequestVoteReply struct {
Term int "currentTerm, for candidate to update itself"
VoteGranted bool "true means candidate received vote"
}
//RequestVote RPC handler.
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
if (args.Term > rf.currentTerm) {//all server rule 1 If RPC request or response contains term T > currentTerm:
rf.beFollower(args.Term) // set currentTerm = T, convert to follower (§5.1)
}
reply.Term = rf.currentTerm
reply.VoteGranted = false
if (args.Term < rf.currentTerm) || (rf.votedFor != NULL && rf.votedFor != args.CandidateId) {
// Reply false if term < currentTerm (§5.1) If votedFor is not null and not candidateId,
} else if args.LastLogTerm < rf.getLastLogTerm() || (args.LastLogTerm == rf.getLastLogTerm() && args.LastLogIndex < rf.getLastLogIdx()){
//If the logs have last entries with different terms, then the log with the later term is more up-to-date.
// If the logs end with the same term, then whichever log is longer is more up-to-date.
// Reply false if candidate’s log is at least as up-to-date as receiver’s log
} else {
//grant vote
rf.votedFor = args.CandidateId
reply.VoteGranted = true
rf.state = Follower
rf.persist()
send(rf.voteCh) //because If election timeout elapses without receiving granting vote to candidate, so wake up
}
}
////RequestVote RPC sender.
func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {
ok := rf.peers[server].Call("Raft.RequestVote", args, reply)
return ok
}
type AppendEntriesArgs struct {
Term int "leader’s term"
LeaderId int "so follower can redirect clients"
PrevLogIndex int "index of log entry immediately preceding new ones"
PrevLogTerm int "term of prevLogIndex entry"
Entries []Log "log entries to store (empty for heartbeat;may send more than one for efficiency)"
LeaderCommit int "leader’s commitIndex"
}
type AppendEntriesReply struct {
Term int "currentTerm, for leader to update itself"
Success bool "true if follower contained entry matching prevLogIndex and prevLogTerm"
ConflictIndex int "the first index it stores for that conflict term"
ConflictTerm int "the term of the conflicting entry"
}
//AppendEntries RPC handler.
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {//now only for heartbeat
rf.mu.Lock()
defer rf.mu.Unlock()
defer send(rf.appendLogCh) //If election timeout elapses without receiving AppendEntries RPC from current leader
if args.Term > rf.currentTerm { //all server rule 1 If RPC request or response contains term T > currentTerm:
rf.beFollower(args.Term) // set currentTerm = T, convert to follower (§5.1)
}
reply.Term = rf.currentTerm
reply.Success = false
reply.ConflictTerm = NULL
reply.ConflictIndex = 0
//1. Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3)
prevLogIndexTerm := -1
logSize := len(rf.log)
if args.PrevLogIndex >= 0 && args.PrevLogIndex < len(rf.log) {
prevLogIndexTerm = rf.log[args.PrevLogIndex].Term
}
if prevLogIndexTerm != args.PrevLogTerm {
reply.ConflictIndex = logSize
if prevLogIndexTerm == -1 {//If a follower does not have prevLogIndex in its log,
//it should return with conflictIndex = len(log) and conflictTerm = None.
} else { //If a follower does have prevLogIndex in its log, but the term does not match
reply.ConflictTerm = prevLogIndexTerm //it should return conflictTerm = log[prevLogIndex].Term,
i := 0
for ; i < logSize; i++ {//and then search its log for
if rf.log[i].Term == reply.ConflictTerm {//the first index whose entry has term equal to conflictTerm
reply.ConflictIndex = i
break
}
}
}
return
}
//2. Reply false if term < currentTerm (§5.1)
if args.Term < rf.currentTerm {return}
index := args.PrevLogIndex
for i := 0; i < len(args.Entries); i++ {
index++
if index < logSize {
if rf.log[index].Term == args.Entries[i].Term {
continue
} else {//3. If an existing entry conflicts with a new one (same index but different terms),
rf.log = rf.log[:index]//delete the existing entry and all that follow it (§5.3)
}
}
rf.log = append(rf.log,args.Entries[i:]...) //4. Append any new entries not already in the log
rf.persist()
break;
}
//5. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry)
if args.LeaderCommit > rf.commitIndex {
rf.commitIndex = Min(args.LeaderCommit ,rf.getLastLogIdx())
rf.updateLastApplied()
}
reply.Success = true
}
func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool {
ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
return ok
}
//Leader Section:
func (rf *Raft) startAppendLog() {
for i := 0; i < len(rf.peers); i++ {
if i == rf.me {
continue
}
go func(idx int) {
for {
rf.mu.Lock();
if rf.state != Leader {
rf.mu.Unlock()
return
} //send initial empty AppendEntries RPCs (heartbeat) to each server
args := AppendEntriesArgs{
rf.currentTerm,
rf.me,
rf.getPrevLogIdx(idx),
rf.getPrevLogTerm(idx),
//If last log index ≥ nextIndex for a follower:send AppendEntries RPC with log entries starting at nextIndex
//nextIndex > last log index, rf.log[rf.nextIndex[idx]:] will be empty then like a heartbeat
append(make([]Log,0),rf.log[rf.nextIndex[idx]:]...),
rf.commitIndex,
}
rf.mu.Unlock()
reply := &AppendEntriesReply{}
ret := rf.sendAppendEntries(idx, &args, reply)
rf.mu.Lock();
if !ret || rf.state != Leader || rf.currentTerm != args.Term {
rf.mu.Unlock()
return
}
if reply.Term > rf.currentTerm {//all server rule 1 If RPC response contains term T > currentTerm:
rf.beFollower(reply.Term) // set currentTerm = T, convert to follower (§5.1)
rf.mu.Unlock()
return
}
if reply.Success {//If successful:update nextIndex and matchIndex for follower
rf.matchIndex[idx] = args.PrevLogIndex + len(args.Entries)
rf.nextIndex[idx] = rf.matchIndex[idx] + 1
rf.updateCommitIndex()
rf.mu.Unlock()
return
} else { //If AppendEntries fails because of log inconsistency: decrement nextIndex and retry
tarIndex := reply.ConflictIndex //If it does not find an entry with that term
if reply.ConflictTerm != NULL {
logSize := len(rf.log) //first search its log for conflictTerm
for i := 0; i < logSize; i++ {//if it finds an entry in its log with that term,
if rf.log[i].Term != reply.ConflictTerm { continue }
for i < logSize && rf.log[i].Term == reply.ConflictTerm { i++ }//set nextIndex to be the one
tarIndex = i //beyond the index of the last entry in that term in its log
}
}
rf.nextIndex[idx] = tarIndex;
rf.mu.Unlock()
}
}
}(i)
}
}
// the first return value is the index that the command will appear at
// if it's ever committed. the second return value is the current
// term. the third return value is true if this server believes it is
// the leader.
func (rf *Raft) Start(command interface{}) (int, int, bool) {
rf.mu.Lock()
defer rf.mu.Unlock()
index := -1
term := rf.currentTerm
isLeader := (rf.state == Leader)
//If command received from client: append entry to local log, respond after entry applied to state machine (§5.3)
if isLeader {
index = rf.getLastLogIdx() + 1
newLog := Log{
rf.currentTerm,
command,
}
rf.log = append(rf.log,newLog)
rf.persist()
}
return index, term, isLeader
}
//If there exists an N such that N > commitIndex,
// a majority of matchIndex[i] ≥ N, and log[N].term == currentTerm: set commitIndex = N (§5.3, §5.4).
func (rf *Raft) updateCommitIndex() {
rf.matchIndex[rf.me] = len(rf.log) - 1
copyMatchIndex := make([]int,len(rf.matchIndex))
copy(copyMatchIndex,rf.matchIndex)
sort.Sort(sort.Reverse(sort.IntSlice(copyMatchIndex)))
N := copyMatchIndex[len(copyMatchIndex)/2]
if N > rf.commitIndex && rf.log[N].Term == rf.currentTerm {
rf.commitIndex = N
rf.updateLastApplied()
}
}
func (rf *Raft) beLeader() {
if rf.state != Candidate {
return
}
rf.state = Leader
//initialize leader data
rf.nextIndex = make([]int,len(rf.peers))
rf.matchIndex = make([]int,len(rf.peers))//initialized to 0
for i := 0; i < len(rf.nextIndex); i++ {//(initialized to leader last log index + 1)
rf.nextIndex[i] = rf.getLastLogIdx() + 1
}
}
//end Leader section
//Candidate Section:
// If AppendEntries RPC received from new leader: convert to follower implemented in AppendEntries RPC Handler
func (rf *Raft) beCandidate() { //Reset election timer are finished in caller
rf.state = Candidate
rf.currentTerm++ //Increment currentTerm
rf.votedFor = rf.me //vote myself first
rf.persist()
//ask for other's vote
go rf.startElection() //Send RequestVote RPCs to all other servers
}
//If election timeout elapses: start new election handled in caller
func (rf *Raft) startElection() {
rf.mu.Lock()
args := RequestVoteArgs{
rf.currentTerm,
rf.me,
rf.getLastLogIdx(),
rf.getLastLogTerm(),
};
rf.mu.Unlock()
var votes int32 = 1;
for i := 0; i < len(rf.peers); i++ {
if i == rf.me {
continue
}
go func(idx int) {
reply := &RequestVoteReply{}
ret := rf.sendRequestVote(idx,&args,reply)
if ret {
rf.mu.Lock()
defer rf.mu.Unlock()
if reply.Term > rf.currentTerm {
rf.beFollower(reply.Term)
return
}
if rf.state != Candidate || rf.currentTerm != args.Term{
return
}
if reply.VoteGranted {
atomic.AddInt32(&votes,1)
} //If votes received from majority of servers: become leader
if atomic.LoadInt32(&votes) > int32(len(rf.peers) / 2) {
rf.beLeader()
send(rf.voteCh) //after be leader, then notify 'select' goroutine will sending out heartbeats immediately
}
}
}(i)
}
}
//end Candidate section
//Follower Section:
func (rf *Raft) beFollower(term int) {
rf.state = Follower
rf.votedFor = NULL
rf.currentTerm = term
rf.persist()
}
//end Follower section
//all server rule : If commitIndex > lastApplied: increment lastApplied, apply log[lastApplied] to state machine
func (rf *Raft) updateLastApplied() {
for rf.lastApplied < rf.commitIndex {
rf.lastApplied++
curLog := rf.log[rf.lastApplied]
applyMsg := ApplyMsg{
true,
curLog.Command,
rf.lastApplied,
}
rf.applyCh <- applyMsg
}
}
// the tester calls Kill() when a Raft instance won't be needed again.
func (rf *Raft) Kill() {
send(rf.killCh)
}
//Helper function
func send(ch chan bool) {
select {
case <-ch: //if already set, consume it then resent to avoid block
default:
}
ch <- true
}
func (rf *Raft) getPrevLogIdx(i int) int {
return rf.nextIndex[i] - 1
}
func (rf *Raft) getPrevLogTerm(i int) int {
prevLogIdx := rf.getPrevLogIdx(i)
if prevLogIdx < 0 {
return -1
}
return rf.log[prevLogIdx].Term
}
func (rf *Raft) getLastLogIdx() int {
return len(rf.log) - 1
}
func (rf *Raft) getLastLogTerm() int {
idx := rf.getLastLogIdx()
if idx < 0 {
return -1
}
return rf.log[idx].Term
}
// the service or tester wants to create a Raft server. the ports
// of all the Raft servers (including this one) are in peers[]. this
// server's port is peers[me]. all the servers' peers[] arrays
// have the same order. persister is a place for this server to
// save its persistent state, and also initially holds the most
// recent saved state, if any. applyCh is a channel on which the
// tester or service expects Raft to send ApplyMsg messages.
// Make() must return quickly, so it should start goroutines
// for any long-running work.
func Make(peers []*labrpc.ClientEnd, me int,
persister *Persister, applyCh chan ApplyMsg) *Raft {
rf := &Raft{}
rf.peers = peers
rf.persister = persister
rf.me = me
rf.state = Follower
rf.currentTerm = 0
rf.votedFor = NULL
rf.log = make([]Log,1) //(first index is 1)
rf.commitIndex = 0
rf.lastApplied = 0
rf.applyCh = applyCh
//because gorountne only send the chan to below goroutine,to avoid block, need 1 buffer
rf.voteCh = make(chan bool,1)
rf.appendLogCh = make(chan bool,1)
rf.killCh = make(chan bool,1)
// initialize from state persisted before a crash
rf.readPersist(persister.ReadRaftState())
//because from hint The tester requires that the leader send heartbeat RPCs no more than ten times per second.
heartbeatTime := time.Duration(100) * time.Millisecond
//from hint :You'll need to write code that takes actions periodically or after delays in time.
// The easiest way to do this is to create a goroutine with a loop that calls time.Sleep().
go func() {
for {
select {
case <-rf.killCh:
return
default:
}
electionTime := time.Duration(rand.Intn(200) + 300) * time.Millisecond
rf.mu.Lock()
state := rf.state
rf.mu.Unlock()
switch state {
case Follower, Candidate:
select {
case <-rf.voteCh:
case <-rf.appendLogCh:
case <-time.After(electionTime):
rf.mu.Lock()
rf.beCandidate() //becandidate, Reset election timer, then start election
rf.mu.Unlock()
}
case Leader:
rf.startAppendLog()
time.Sleep(heartbeatTime)
}
}
}()
return rf
}