Raft是一种分布式一致性算法,目的是在分布式系统中实现可靠的状态机复制。
从这个Lab开始上强度了,实现和 debug 过程都很繁琐。
Part 3A: leader election
Part A的目标是实现 Leader 选举。
三种状态的节点的工作
对于 PartA,三种 State 的节点的工作分别为:
- Follower: 监测自身的心跳状态,如果一段时间内没有收到有效的(term 更大或相等)来自 Leader 的心跳或 Candidate 的投票请求,则变为 Candidate
- Candidate: 成为 Candidate 后立即开始选举,自增 term 并给除自己外的所有节点发送投票请求,同意票超过半数则将自己变为 Leader;如果选举失败或超时,则再次开始选举
- Leader: 不断为其他所有节点发送 AppendEntries RPC
三类节点相互转换的条件有:
- Follower: 监测到自身的心跳状态异常时,将自己变为 Candidate
- Candidate: 选举成功后将自己变为 Leader; 若收到 term 大于等于自身的 Leader 的心跳则将自己变为Follower;若收到的心跳或投票请求的term大于自身的term则将自己变为Follower
- Leader: 收到 term 大于自身的投票请求将自己变为 Follower; 若发送的 AppendEntries RPC 得到的 reply 中的 term 大于自身的 term 则将自己变为 Follower
具体实现
ticker()与心跳检测
Lab框架给出的ticker()作用是每隔随机50~350ms检查当前节点的心跳状态。对于心跳状态的实现,我采用记录每一次收到心跳的时间戳的方式,每次ticker只需要获取当前时间和上一次收到心跳的时间差,然后和超时时间进行对比即可。
func (rf *Raft) ticker() {
for rf.killed() == false {
// Your code here (3A)
// Check if a leader election should be started.
rf.mu.Lock()
if rf.state == FOLLOWER && time.Since(rf.electionTimer) > time.Duration(250+(rand.Int63()%100))*time.Millisecond {
rf.becomeCandidate()
}
rf.mu.Unlock()
// pause for a random amount of time between 50 and 350
// milliseconds.
ms := 50 + (rand.Int63() % 300)
time.Sleep(time.Duration(ms) * time.Millisecond)
}
}
一开始实现的时候我是想用Timer做大部分的时间处理,然后就看到了Lab hints中有一条写到:
You’ll need to write code that takes actions periodically or after delays in time. The easiest way to do this is to create a goroutine with a loop that calls time.Sleep(); see the
ticker()
goroutine thatMake()
creates for this purpose. Don’t use Go’stime.Timer
ortime.Ticker
, which are difficult to use correctly.
所以我很干脆的弃用了 Timer,转而使用 Sleep,后面还写出了
go func() {
time.Sleep(time.Duration(700+(rand.Int63()%300)) * time.Millisecond)
resultChan <- false
}()
这种看起来有点蠢的代码用来判断选举是否超时,这里我个人觉得肯定是用 After/Timer+select 更优雅。
选举
按照论文的图2实现即可
func (rf *Raft) becomeCandidate() {
rf.state = CANDIDATE
for !rf.killed() {
rf.currentTerm++
rf.votedFor = &rf.me
resultChan := make(chan bool)
go func() {
time.Sleep(time.Duration(700+(rand.Int63()%300)) * time.Millisecond)
resultChan <- false
}()
go rf.startElection(len(rf.peers), resultChan)
if <-resultChan {
rf.becomeLeader()
break
} else {
rf.mu.Unlock()
// sleep for a while to update state
time.Sleep(time.Duration(100+(rand.Int63()%140)) * time.Millisecond)
rf.mu.Lock()
if rf.state != CANDIDATE {
break
}
}
}
}
一开始把 rf.becomeLeader() 写进了 startElection() 中,这样做的问题是超时以后原来的协程仍在工作,导致应当被舍弃的选举流程继续下去了,可能会出现多个 Leader。
func (rf *Raft) startElection(len int, resultChan chan (bool)) {
voteNum := 1
finished := 1
args := RequestVoteArgs{Term: rf.currentTerm, CandidateId: rf.me}
tempTerm := rf.currentTerm
termMax := true
var tempMu sync.Mutex
cond := sync.NewCond(&tempMu)
for i := 0; i < len && termMax; i++ {
if i == rf.me {
continue
}
go func(i int) {
var reply RequestVoteReply
rf.sendRequestVote(i, &args, &reply)
tempMu.Lock()
if reply.VoteGranted {
voteNum++
}
if reply.Term > tempTerm {
tempTerm = reply.Term
}
finished++
tempMu.Unlock()
cond.Broadcast()
}(i)
}
tempMu.Lock()
for voteNum <= len/2 && finished != len && termMax {
cond.Wait()
}
if voteNum > len/2 && termMax {
resultChan <- true
}
if !termMax {
rf.currentTerm = tempTerm
rf.becomeFollower()
}
tempMu.Unlock()
resultChan <- false
}
选举中如果遇到节点的term大于自己,则需要放弃选举并退回到 Follower,更新自己的 term。为了防止竞态,退回到 Follower 和更新 term 需要在协程外进行。
投票
节点收到投票时,首先比较发起选举的节点term和自己的term:
发起方<己方:无效选举
发起方=己方:
- 若己方为 Follower 且无投票对象或投票对象是此次发起方:投票并重置心跳计时器
- 若己方为 Candidate:此时己方必定有投票对象自己,拒绝投票
- Leader 永远不会投票
发起方>己方:投票并更新term,如果己方不是 Follower 就先变成 Follower 再进行投票
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
// Your code here (3A, 3B).
rf.mu.Lock()
defer rf.mu.Unlock()
reply.Term = rf.currentTerm
if args.Term < rf.currentTerm {
reply.VoteGranted = false
return
} else if args.Term == rf.currentTerm {
if (rf.votedFor == nil || *rf.votedFor == args.CandidateId) && rf.state == FOLLOWER {
rf.electionTimer = time.Now()
rf.votedFor = &args.CandidateId
reply.VoteGranted = true
} else {
reply.VoteGranted = false
}
} else {
if rf.state != FOLLOWER {
rf.becomeFollower()
}
rf.currentTerm = args.Term
rf.votedFor = &args.CandidateId
reply.VoteGranted = true
}
}
心跳
func (rf *Raft) becomeLeader() {
rf.state = LEADER
rf.votedFor = nil
go rf.leaderWorker()
}
func (rf *Raft) leaderWorker() {
rf.mu.Lock()
if rf.state != LEADER {
rf.mu.Unlock()
return
}
tempTerm := rf.currentTerm
rf.mu.Unlock()
var wg sync.WaitGroup
termMax := true
for !rf.killed() && termMax {
rf.mu.Lock()
var tempMu sync.Mutex
args := AppendEntries{Term: rf.currentTerm, LeaderId: rf.me}
for i := 0; i < len(rf.peers); i++ {
if i == rf.me {
continue
}
var reply AppendReply
wg.Add(1)
go func(i int) {
rf.sendAppendEntries(i, &args, &reply)
tempMu.Lock()
if reply.Term > tempTerm {
tempTerm = reply.Term
termMax = false
}
tempMu.Unlock()
wg.Done()
}(i)
}
rf.mu.Unlock()
ms := 100 + (rand.Int63() % 140)
time.Sleep(time.Duration(ms) * time.Millisecond)
wg.Wait()
}
if !termMax {
rf.mu.Lock()
defer rf.mu.Unlock()
rf.becomeFollower()
rf.currentTerm = tempTerm
}
}
和选举类似,如果遇到节点的term大于自己,则需要退回到 Follower,更新自己的 term。
总结
自己调试过程中得到的几个教训:
注意锁的作用范围,防止死锁
粗粒度锁+循环的部分一定要空一段时间出来让RPC处理获取锁来更新节点的状态
多重复测试,有些错误发生概率比较低
和我一样使用 print 大法调试的话,多 print 勤 print 不要怕多( 比如我把每一次投票都 print 发现有时候节点的 term 没有被更新,才看到其中一个 args 写成了 reply。
Part 3B: log
Part B 要求我们实现Raft系统中日志的可靠复制,也就是论文图2中的所有内容。
对PartA的补充
添加日志部分后,PartA 中提到的部分机制需要一些修改。
节点为日志落后的参选者投票。当投票发起方的 term >= 投票者的 term 时, 如果对方的日志不如己方新,则拒绝投票。
再者,还需要一个协程应用已经 commit 的日志。
然后就是 AppendEntries,PartA 中我们已经实现了心跳,也就是不包括日志条目的 AppendEntries RPC。PartB的主要工作就是完成这一部分。
系统如何保证日志的可靠复制?
- 当leader节点确认到系统中多数节点收到日志时才会认为日志有效,也就意味着这次被传输的日志已经存在于多数节点
- 节点绝不会为日志落后的参选者投票(如果两日志最后条目的有不同term值,那么term较大的日志更新。如果两日志条目term相同,则index较大的日志更新)
- 节点收到 term 更大的节点的任何消息后都会更新自己的term
上面三点保证了即使leader节点崩溃以后,新选出的leader一定拥有曾得到commit的日志,然后使这些日志在多数节点得到commit以及执行,并且崩溃一段时间的节点重连以后也不会破坏已经commit的日志。
leader如何知道其他节点的log中有多少是正确的?
答案是慢慢试(
节点当选leader后就要维护一个叫 nextIndex 的数组,数组每一项记录着下次发送日志在目标节点的附加起点,初始值为leader日志的最大长度。leader 每次发送追加日志请求时都附带待添加条目的前一个条目的 index 和 term,如果目标节点日志的对应 index 处的日志不存在或是 term 不相同,则认为本次添加失败,leader 减少 nextIndex,重复过程直至存在相同 term 的项,将此项以后的所有条目舍弃并添加 leader 发送的待添加日志。
当然,有时候逐一递减并不是一个好主意,如果目标节点的 commit 日志条目落后太多,leader要发送非常多的追加请求。例子就是 lab 中的 leader backs up quickly over incorrect follower logs 这个 test case,逐一寻找正确的 nextIndex 最终一定会因超时而 fail, 这种情况下我们就需要一些快速回退机制来减少寻找nextIndex的尝试。
所以按照图2的设计慢慢试的话肯定是不行了(,需要在 AppendEntries RPC 的 Reply 结构中添加更多信息来帮助我们快速回退。
NOTE其实逐一回退应该也可以通过 Part B,这里我不用快速回退没法通过的原因会在后面讲到。
选举的补充
Candidate 的投票 RPC 参数需要添加 LastLogIndex 和 LastLogTerm。
index := len(rf.log) - 1
args := RequestVoteArgs{Term: rf.currentTerm, CandidateId: rf.me, LastLogIndex: index, LastLogTerm: rf.log[index].Term}
而接收者部分按照论文图2实现即可:
WARNING这里的代码有细节错误,应当仅在 follower 投票时才更新计时器,后面实现中已改正。虽然这样也能通过所有 test case,但最好还是按照论文说的做。
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
// Your code here (3A, 3B).
rf.mu.Lock()
defer rf.mu.Unlock()
reply.Term = rf.currentTerm
if args.Term < rf.currentTerm {
reply.VoteGranted = false
return
}
if rf.state == FOLLOWER {
rf.electionTimer = time.Now()
}
// requester's logs are newer
newer := (args.LastLogTerm == rf.log[len(rf.log)-1].Term && args.LastLogIndex >= len(rf.log)-1) || args.LastLogTerm > rf.log[len(rf.log)-1].Term
if args.Term == rf.currentTerm {
if (rf.votedFor == nil || *rf.votedFor == args.CandidateId) && newer && rf.state == FOLLOWER {
rf.votedFor = &args.CandidateId
reply.VoteGranted = true
} else {
reply.VoteGranted = false
}
} else {
if rf.state != FOLLOWER {
rf.becomeFollower()
}
rf.currentTerm = args.Term
if newer {
rf.votedFor = &args.CandidateId
reply.VoteGranted = true
} else {
reply.VoteGranted = false
}
}
}
Leader的行为
Leader 上任后,需要立即发送一次空的追加日志条目 RPC 请求作为初始心跳,然后每隔一段时间重复心跳。如果收到了客户端的请求,则写入日志并同步给各节点。当检测到多数节点成功复制该日志后则提交日志并应用于状态机。
首先判断各节点上成功复制的最大日志条目 index 是否等于 leader 的日志长度, 如果相等, 即不存在需要复制的条目,则发送心跳,反之则附加待追加条目。 一开始的时候在这里犯蠢了没有给AppendEntries添加超时检测,导致leader心跳那边的 waitGroup 被卡,调试浪费了好长时间。
var wg sync.WaitGroup
termMax := true
timerCh := make(chan bool)
for !rf.killed() {
rf.mu.Lock()
if rf.state != LEADER {
rf.mu.Unlock()
return
}
logLen := len(rf.log)
tempTerm := rf.currentTerm
var tempMu sync.Mutex
go func() {
time.Sleep(time.Duration(100) * time.Millisecond)
timerCh <- true
}()
for i := 0; i < len(rf.peers); i++ {
if i == rf.me {
continue
}
wg.Add(1)
go func(i int) {
var args AppendEntries
var reply AppendReply
resultChan := make(chan bool)
var result bool
isNeedAppend := rf.matchIndex[i] != logLen-1
if !isNeedAppend {
args = AppendEntries{Term: rf.currentTerm, LeaderId: rf.me, PrevLogIndex: rf.nextIndex[i] - 1, PrevLogTerm: rf.log[rf.nextIndex[i]-1].Term, LeaderCommit: rf.commitIndex}
} else {
args = AppendEntries{Term: rf.currentTerm, LeaderId: rf.me, PrevLogIndex: rf.nextIndex[i] - 1, PrevLogTerm: rf.log[rf.nextIndex[i]-1].Term, Entries: rf.log[rf.nextIndex[i]:], LeaderCommit: rf.commitIndex}
}
go func() {
time.Sleep(time.Duration(10) * time.Millisecond)
resultChan <- false
}()
go func() {
result = rf.sendAppendEntries(i, &args, &reply)
resultChan <- true
}()
if <-resultChan {
tempMu.Lock()
if reply.Term > tempTerm {
tempTerm = reply.Term
termMax = false
} else if reply.Success {
rf.nextIndex[i] = logLen
rf.matchIndex[i] = logLen - 1
} else if result {
if reply.RetryIndex >= 0 {
rf.nextIndex[i] = reply.RetryIndex + 1
}
if reply.RetryTerm >= 0 {
rf.leaderFindNextIndex(i, reply.RetryTerm)
}
}
tempMu.Unlock()
}
wg.Done()
}(i)
}
wg.Wait()
if termMax {
rf.updateCommitIndex()
} else {
rf.becomeFollower()
rf.currentTerm = tempTerm
rf.mu.Unlock()
<-timerCh
return
}
rf.mu.Unlock()
<-timerCh
}
我采用的回退机制是:
- 如果不存在 prevLogIndex 对应的条目:告知 leader 节点目前的最后一项日志的 index,leader 下次从该 index 对应的条目开始尝试。
- 如果 prevLogIndex 存在的条目的 term 不同:告知 leader 当前 index 处条目的实际 term,leader 下次从该 term 或更低的 term 开始尝试。
reply.RetryTerm = -1
reply.RetryIndex = -1
if len(rf.log) <= args.PrevLogIndex {
reply.Success = false
reply.RetryIndex = len(rf.log) - 1
return
}
if rf.log[args.PrevLogIndex].Term != args.PrevLogTerm {
reply.Success = false
reply.RetryTerm = rf.log[args.PrevLogIndex].Term
return
}
reply.Success = true
rf.currentTerm = args.Term
if rf.state != FOLLOWER {
rf.becomeFollower()
}
if len(args.Entries) > 0 {
rf.log = append(rf.log[:args.PrevLogIndex+1], args.Entries...)
}
if args.LeaderCommit > rf.commitIndex {
rf.commitIndex = min(len(rf.log)-1, args.LeaderCommit)
rf.commitCond.Broadcast()
}
每次 commitIndex 有更新时都会提示 applier 协程应用已提交日志,不过后来我还是觉得不用 Cond 而是用循环不断检测 commitIndex 更好些(
总结
做完感觉难度其实也没有网传的那么夸张,不过调试确实挺痛苦的。
有些问题发生率只有百分之x甚至千分之x,这个脚本对发现问题会很有帮助(
Part 3C: persistence
改了 Make() 以后忘了初始化 follower 的选举计时器,导致连 PartB 都有概率过不了又回去改了俩小时才发现问题,各位不要学我(
更快的回退
实现了 persist() 和 readPersist() 以后首次测试只有 Figure 8 (unreliable) 这个 case Fail 了,后面又测试了 500 次,也确实只有这一个 case 有问题。然而 case 和 persistence 没有关系, 完全没有涉及到状态的保存和恢复,所以这个 fail 的原因完全是因为我的 partB 不够完善(
看了一下 tester 的代码,基本流程是首先添加一条指令并得到确认,然后开始一千次迭代,每次迭代首先向所有自认为是 leader 的节点发送一条指令,然后进行休眠,小概率休眠数百毫秒,大概率休眠不到十毫秒,结束后有概率断开最大序号的 leader 节点, 若断开后节点数量不足 3,则随机选择一个节点,若该节点处于断开状态则重连。当迭代次数达到 200 时,启用长时间的消息重排以模拟网络不稳定的情况。迭代结束后重连所有断开的节点,提交一个新指令,要求 10s 内完成该指令的同步。
造成失败的原因是之前提到的回退机制在目标节点对应 index 的日志条目 term 高于 leader 的日志条目时仍是逐一回退。
- 如果不存在 prevLogIndex 对应的条目:告知 leader 节点目前的最后一项日志的 index,leader 下次从该 index 对应的条目开始尝试。
- 如果 prevLogIndex 存在的条目的 term 不同:
- leader 日志条目 term 更大:告知 leader 当前 index 处条目的实际 term,leader 下次从该 term 或更低的 term 的条目开始尝试。
- 当前节点日志条目的 term 更大: 搜索本地日志中 term <= prevLogTerm 的最大 index 条目,leader 下次从该 index 对应的条目开始尝试。
func (rf *Raft) RespondAppendEntries(args *AppendEntries, reply *AppendReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
reply.Term = rf.CurrentTerm
if args.Term < rf.CurrentTerm {
return
}
if rf.state == FOLLOWER {
rf.electionTimer = time.Now()
} else {
rf.becomeFollower()
}
rf.CurrentTerm = args.Term
rf.persist()
reply.RetryTerm = -1
reply.RetryIndex = -1
if len(rf.Log) <= args.PrevLogIndex {
reply.Success = false
reply.RetryIndex = len(rf.Log) - 1
return
}
if rf.Log[args.PrevLogIndex].Term != args.PrevLogTerm {
reply.Success = false
if rf.Log[args.PrevLogIndex].Term > args.PrevLogTerm {
for i := args.PrevLogIndex-1; i >= 0; i-- {
if rf.Log[i].Term <= args.PrevLogTerm {
reply.RetryIndex = i
return
}
}
} else {
reply.RetryTerm = rf.Log[args.PrevLogIndex].Term
}
return
}
reply.Success = true
if len(args.Entries) > 0 {
rf.Log = append(rf.Log[:args.PrevLogIndex+1], args.Entries...)
}
rf.persist()
if args.LeaderCommit > rf.commitIndex {
rf.commitIndex = min(len(rf.Log)-1, args.LeaderCommit)
rf.commitCond.Broadcast()
}
}
更改后顺利 pass。
优化
虽说是全部通过了,但是速度慢到感人。单单一个 Figure 8 (unreliable) 就要花两分钟上下,不优化不行了。
首先对 Figure 8 (unreliable) 的时间占用进行调试, 发现在千次迭代中起初遍历所有节点并向 leader 添加指令的操作占用时间很长,最高能达到 600ms 以上,但最低值仅有 5ms。
那问题就好说了,出现这种情况原因只能是 Start() 占用了太长时间,而其本身也没有任何复杂操作,肯定是其他操作占用锁阻塞时长,数百毫秒的时长占用也只能是 Candidate 导致的。
更改竞选开始时锁的解锁位置,使竞选过程中也能回复 Start(),并且在选举结束后再次检查当前节点状态,避免出现多个 leader。
func (rf *Raft) becomeCandidate() {
rf.state = CANDIDATE
for !rf.killed() {
rf.CurrentTerm++
rf.VotedFor = rf.me
rf.persist()
rf.mu.Unlock()
resultChan := make(chan bool)
go func() {
time.Sleep(time.Duration(500+(rand.Int63()%100)) * time.Millisecond)
resultChan <- false
}()
go rf.startElection(len(rf.peers), resultChan)
if <-resultChan {
rf.mu.Lock()
if rf.state == CANDIDATE {
rf.becomeLeader()
}
break
} else {
// sleep for a while to update state
time.Sleep(time.Duration(120+(rand.Int63()%80)) * time.Millisecond)
rf.mu.Lock()
if rf.state != CANDIDATE {
break
}
}
}
}
修改后完成一次 3C test 总时长约 2min30s。
具体实现
这个 part 应该叫做 Figure 8 而不是 persistence。
func (rf *Raft) persist() {
// Your code here (3C).
// Example:
// w := new(bytes.Buffer)
// e := labgob.NewEncoder(w)
// e.Encode(rf.xxx)
// e.Encode(rf.yyy)
// raftstate := w.Bytes()
// rf.persister.Save(raftstate, nil)
//log currentTerm voteFor
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(rf.Log)
e.Encode(rf.CurrentTerm)
e.Encode(rf.VotedFor)
raftstate := w.Bytes()
rf.persister.Save(raftstate, nil)
}
// restore previously persisted state.
func (rf *Raft) readPersist(data []byte) {
if data == nil || len(data) < 1 { // bootstrap without any state?
return
}
// Your code here (3C).
// Example:
// r := bytes.NewBuffer(data)
// d := labgob.NewDecoder(r)
// var xxx
// var yyy
// if d.Decode(&xxx) != nil ||
// d.Decode(&yyy) != nil {
// error...
// } else {
// rf.xxx = xxx
// rf.yyy = yyy
// }
r := bytes.NewBuffer(data)
d := labgob.NewDecoder(r)
var Log []Log
var CurrentTerm int
var VotedFor int
if d.Decode(&Log) != nil || d.Decode(&CurrentTerm) != nil || d.Decode(&VotedFor) != nil {
DPrintf("error!\n")
} else {
rf.CurrentTerm = CurrentTerm
rf.Log = Log
rf.VotedFor = VotedFor
}
}
Part 3D: log compaction
Part D是思路最清晰同时做起来最难受的一部分。
要做的事情可以分为三步:
- 实现快照的接收与保存。
- 实现 leader 对 follower 节点快照状态的检测以及相关 RPC 处理。
- 处理日志的索引等相关参数。
最痛苦的部分就在于第三步中巨量的越界错误。
具体实现
快照
func (rf *Raft) Snapshot(index int, snapshot []byte) {
// Your code here (3D).
rf.mu.Lock()
defer rf.mu.Unlock()
if index > 0 {
rf.SnapshotState.SnapshotTerm = rf.Log[index-rf.SnapshotState.SnapshotIndex].Term
rf.Log = rf.Log[index-rf.SnapshotState.SnapshotIndex:]
rf.Log[0] = Log{Term: rf.SnapshotState.SnapshotTerm}
if rf.state == LEADER {
for i := range rf.nextIndex {
if i == rf.me {
continue
}
rf.nextIndex[i] = len(rf.Log) + index
}
}
rf.SnapshotState.SnapshotIndex = index
}
rf.SnapshotState.SnapshotSave = snapshot
rf.persist()
}
接收快照会改变日志的长度, 所以 nextIndex 的长度也要进行相应修改。
类似的,在初始化 nextIndex 时除了 log 的长度,还要加上快照涵盖的日志长度。
func (rf *Raft) becomeLeader() {
rf.state = LEADER
rf.VotedFor = NULL
rf.persist()
rf.nextIndex = make([]int, len(rf.peers))
for i := range rf.nextIndex {
rf.nextIndex[i] = len(rf.Log) + rf.SnapshotState.SnapshotIndex
}
rf.matchIndex = make([]int, len(rf.peers))
go func() {
rf.leaderFirstHeartBeat()
rf.leaderWorker()
}()
}
索引
在 A ~ C 中我们使用了很多日志索引来定位日志,而引入快照之后日志的实际索引会有偏移,偏移值为快照涵盖的日志长度。所以我们需要记录这个长度并对其进行持久化。
例如在判断是否投票时需要对比日志的新旧:
newer := (args.LastLogTerm == rf.Log[len(rf.Log)-1].Term && args.LastLogIndex >= len(rf.Log)-1+rf.SnapshotState.SnapshotIndex) || args.LastLogTerm > rf.Log[len(rf.Log)-1].Term
以及上述提到的 nextIndex的值都需要注意,这部分是整个 lab 最容易出错的地方。
快照的发送
当回退无法找到对应的日志条目时, leader 需要向节点发送快照。
...
else if result {
if reply.RetryIndex >= 0 {
if reply.RetryIndex < rf.SnapshotState.SnapshotIndex {
replyTerm, result := rf.SendInstallSnapshot(i)
if result {
if replyTerm > rf.CurrentTerm {
termMax = false
} else {
rf.nextIndex[i] = logLen + rf.SnapshotState.SnapshotIndex
}
tempMu.Unlock()
}
wg.Done()
return
} else {
rf.nextIndex[i] = reply.RetryIndex + 1
}
}
if reply.RetryTerm >= 0 {
if !rf.leaderFindNextIndex(i, reply.RetryTerm) {
...
// 同上
}
}
}
Follower 的处理按照论文中的图 13 实现即可。
func (rf *Raft) RespondInstallSnapshot(args *SnapshotArgs, reply *SnapshotReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
reply.Term = rf.CurrentTerm
if args.Term < rf.CurrentTerm {
return
} else if rf.state != FOLLOWER {
rf.becomeFollower()
} else {
rf.electionTimer = time.Now()
}
if args.LastIncludedIndex < len(rf.Log) && rf.Log[args.LastIncludedIndex].Term == args.LastIncludedTerm {
return
}
rf.Log = make([]Log, 1, 128)
rf.Log[0] = Log{Term: args.LastIncludedTerm}
rf.SnapshotState = SnapshotState{SnapshotSave: args.Data, SnapshotIndex: args.LastIncludedIndex, SnapshotTerm: args.LastIncludedTerm}
rf.applyCh <- ApplyMsg{CommandValid: false, SnapshotValid: true, Snapshot: rf.SnapshotState.SnapshotSave, SnapshotTerm: rf.SnapshotState.SnapshotTerm, SnapshotIndex: rf.SnapshotState.SnapshotIndex}
}
日志的初始项 term 不能设置成 0 了,需要设置成快照 term 来对齐条目的查找。
持久化
func (rf *Raft) persist() {
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(rf.Log)
e.Encode(rf.CurrentTerm)
e.Encode(rf.VotedFor)
e.Encode(rf.SnapshotState.SnapshotTerm)
e.Encode(rf.SnapshotState.SnapshotIndex)
raftstate := w.Bytes()
if rf.SnapshotState.SnapshotIndex == 0 {
rf.persister.Save(raftstate, nil)
} else {
rf.persister.Save(raftstate, rf.SnapshotState.SnapshotSave)
}
}
func (rf *Raft) readPersist(data []byte) {
if data == nil || len(data) < 1 { // bootstrap without any state?
return
}
r := bytes.NewBuffer(data)
d := labgob.NewDecoder(r)
var Log []Log
var CurrentTerm int
var VotedFor int
var snapshotTerm int
var snapshotIndex int
if d.Decode(&Log) != nil || d.Decode(&CurrentTerm) != nil || d.Decode(&VotedFor) != nil || d.Decode(&snapshotTerm) != nil || d.Decode(&snapshotIndex) != nil {
DPrintf("error!\n")
} else {
rf.CurrentTerm = CurrentTerm
rf.Log = Log
rf.VotedFor = VotedFor
rf.SnapshotState.SnapshotTerm = snapshotTerm
rf.SnapshotState.SnapshotIndex = snapshotIndex
rf.commitIndex = snapshotIndex
}
}
func (rf *Raft) readSnapShot(data []byte) {
if data == nil || len(data) < 1 { // bootstrap without any state?
return
}
rf.SnapshotState.SnapshotSave = data
}
persister.save() 的第二项参数直接填入快照,节点初始化时调用 rf.readSnapShot(persister.ReadSnapshot()) 即可。
死锁
测试时总是有节点莫名其妙阻塞,Debug 发现测试程序会停止 applyCh 的接收而要求节点接收快照,从而导致死锁。
可以采用 select 方法,在 applyCh 没有接收的时候暂时解锁,记得把 lastApplied 减回去(
func (rf *Raft) apply() {
for !rf.killed() {
rf.mu.Lock()
rf.commitCond.Wait()
if rf.lastApplied < rf.SnapshotState.SnapshotIndex {
rf.lastApplied = rf.SnapshotState.SnapshotIndex
}
for rf.commitIndex > rf.lastApplied {
rf.lastApplied++
select {
case rf.applyCh <- ApplyMsg{CommandValid: true, Command: rf.Log[rf.lastApplied-rf.SnapshotState.SnapshotIndex].Command, CommandIndex: rf.lastApplied}:
default:
rf.lastApplied--
rf.mu.Unlock()
time.Sleep(100 * time.Microsecond)
rf.mu.Lock()
}
}
rf.mu.Unlock()
}
}
总结
一路做下来感觉最大的难点还是在于寻找 bug,比如这一部分写出来短短两行,但实际上找出这个 bug 花了我半天时间。所以不仅要熟悉自己的代码,还要理清大部分测试代码的逻辑。
其次要注意锁的粒度,锁中套锁这种低级错误比较容易发现,更需要注意的,同时也是 lab 的 hint 中提到的:不要进行完全不间断的循环。
所有测试最好都用助教提供的脚本进行 2k+ 次,过程中使用 util.go 中提供的 DPrint 记录下日志,有些 bug 说成可遇不可求也毫不夸张。
附单次测试结果:
$ time go test -run 3
Test (3A): initial election ...
... Passed -- 3.5 3 62 15852 0
Test (3A): election after network failure ...
... Passed -- 5.1 3 129 24168 0
Test (3A): multiple elections ...
... Passed -- 7.0 7 740 135792 0
Test (3B): basic agreement ...
... Passed -- 1.1 3 16 4124 3
Test (3B): RPC byte count ...
... Passed -- 2.6 3 48 113064 11
Test (3B): test progressive failure of followers ...
... Passed -- 5.1 3 110 22962 3
Test (3B): test failure of leaders ...
... Passed -- 5.6 3 192 38864 3
Test (3B): agreement after follower reconnects ...
... Passed -- 6.4 3 122 29674 7
Test (3B): no agreement if too many followers disconnect ...
... Passed -- 4.2 5 186 37984 3
Test (3B): concurrent Start()s ...
... Passed -- 0.6 3 8 2052 6
Test (3B): rejoin of partitioned leader ...
... Passed -- 4.8 3 144 31017 4
Test (3B): leader backs up quickly over incorrect follower logs ...
... Passed -- 27.2 5 2123 1621069 102
Test (3B): RPC counts aren't too high ...
... Passed -- 2.2 3 38 10226 12
Test (3C): basic persistence ...
... Passed -- 4.6 3 80 19066 6
Test (3C): more persistence ...
... Passed -- 18.6 5 996 204870 16
Test (3C): partitioned leader and one follower crash, leader restarts ...
... Passed -- 2.5 3 38 9018 4
Test (3C): Figure 8 ...
... Passed -- 29.2 5 672 127664 18
Test (3C): unreliable agreement ...
... Passed -- 5.6 5 212 69514 246
Test (3C): Figure 8 (unreliable) ...
... Passed -- 40.9 5 3548 9105166 318
Test (3C): churn ...
... Passed -- 16.2 5 904 491678 222
Test (3C): unreliable churn ...
... Passed -- 16.2 5 604 233967 105
Test (3D): snapshots basic ...
... Passed -- 7.3 3 138 45466 210
Test (3D): install snapshots (disconnect) ...
... Passed -- 62.2 3 1408 462099 296
Test (3D): install snapshots (disconnect+unreliable) ...
... Passed -- 71.0 3 1603 494791 352
Test (3D): install snapshots (crash) ...
... Passed -- 41.4 3 784 286346 352
Test (3D): install snapshots (unreliable+crash) ...
... Passed -- 48.1 3 874 321159 354
Test (3D): crash and restart all servers ...
... Passed -- 15.6 3 276 74866 63
Test (3D): snapshot initialization after crash ...
... Passed -- 4.4 3 68 17990 14
PASS
ok 6.5840/raft 459.328s
real 7m39.596s
user 0m9.691s
sys 0m1.889s