Dream Blue
1710 字
9 分钟
MIT 6.5840(原 6.824) 通关笔记 Lab4 Fault-tolerant Key/Value Service
这次的 Lab 要求我们使用 Lab3 中实现的 raft 构建一个 kvserver。
Part A: Key/value service without snapshots
kvserver
Clerk 向记录的 leader kvserver 发送请求,如果超时或失败,则重新寻找 leader 并发送请求。
kvserver 收到请求后调用 Start(),等待 Raft 完成共识。提交的命令完成后回复 Clerk 的 RPC。
kvserver 侧如何得知结果被应用?
不妨利用 Lab 2 中用到的历史记录。在每次命令被提交并执行后将 history 设置,RPC 处理部分不断检测对应项,当对应项不为空时则判断为命令执行完成。
每个新操作的 RPC 请求都会附带上一操作的序号,表示客户端已经收到上次操作的结果。剩下的就交给 Raft 吧。
完成以后初次测试只有第二个 test case 出现了超时错误
test_test.go:419: Operations completed too slowly 100.859572ms/op > 33.333333ms/op
测试要求每 100ms 至少进行三次操作,如果按我在 Lab 3 中的固定每 100ms 发送一次 AppendEntries 的做法肯定是过不了的,所以又得回去改 Raft(
func (rf *Raft) Start(command interface{}) (int, int, bool) { index := -1 term := -1 isLeader := true
// Your code here (3B). rf.mu.Lock() defer rf.mu.Unlock() term, isLeader = rf.CurrentTerm, rf.state == LEADER if !isLeader { return index, term, isLeader } else { index = len(rf.Log) + rf.SnapshotState.SnapshotIndex rf.Log = append(rf.Log, Log{Command: command, Term: rf.CurrentTerm}) rf.persist() rf.matchIndex[rf.me] = len(rf.Log) - 1 + rf.SnapshotState.SnapshotIndex go func() { time.Sleep(1 * time.Millisecond) rf.mu.Lock() if rf.state != LEADER { rf.mu.Unlock() return } rf.sendAppendEntriesToAll() rf.mu.Unlock() }() } return index, term, isLeader}
具体实现
client.go
func MakeClerk(servers []*labrpc.ClientEnd) *Clerk { ck := new(Clerk) ck.servers = servers // You'll have to add code here. ck.leader = mathRand.Intn(len(servers)) ck.lastOpID = 0 return ck}func (ck *Clerk) Get(key string) string { // You will have to modify this function. opId := nrand() args := GetArgs{Key: key, ID: opId, LastOpID: ck.lastOpID} start := ck.leader for { reply := GetReply{} resultCh := make(chan bool) leader := ck.leader var ok bool go func() { time.Sleep(100 * time.Millisecond) resultCh <- false }() go func() { ok = ck.servers[leader].Call("KVServer.Get", &args, &reply) resultCh <- true }() if <-resultCh { if ok && reply.Err == "" { ck.lastOpID = opId return reply.Value } } ck.leader = (ck.leader + 1) % len(ck.servers) if ck.leader == start { time.Sleep(200 * time.Millisecond) } }}func (ck *Clerk) PutAppend(key string, value string, op string) { // You will have to modify this function. opId := nrand() args := PutAppendArgs{Key: key, Value: value, ID: opId, LastOpID: ck.lastOpID} start := ck.leader for { resultCh := make(chan bool) var ok bool leader := ck.leader reply := PutAppendReply{} go func() { ok = ck.servers[leader].Call("KVServer."+op, &args, &reply) resultCh <- true }() go func() { time.Sleep(100 * time.Millisecond) resultCh <- false }() if <-resultCh { if ok && reply.Err == "" { ck.lastOpID = opId break } } ck.leader = (ck.leader + 1) % len(ck.servers) if ck.leader == start { time.Sleep(200 * time.Millisecond) } }}
初始化时随机给 Clerk 分配一个假 leader,当请求失败后再更换节点尝试,全部节点都失败后等待一会儿再开始新一轮的重试。
server.go
func (kv *KVServer) Get(args *GetArgs, reply *GetReply) { // Your code here. op := Op{Operation: "Get", Key: args.Key, ID: args.ID, LastOpID: args.LastOpID} _, _, success := kv.rf.Start(op) if !success { reply.Err = "not leader" } else { for !kv.killed() { if _, isLeader := kv.rf.GetState(); !isLeader { reply.Err = "leader expired" return } kv.mu.Lock() if _, ok := kv.history[op.ID]; ok { reply.Value = kv.pairs[op.Key] kv.mu.Unlock() return } kv.mu.Unlock() time.Sleep(1 * time.Millisecond) } }}func (kv *KVServer) apply() { for !kv.killed() { applyMsg := <-kv.applyCh if applyMsg.CommandValid { op, _ := applyMsg.Command.(Op) kv.mu.Lock() if _, ok := kv.history[op.ID]; ok { kv.mu.Unlock() continue } delete(kv.history, op.LastOpID)
switch op.Operation { case "Get": kv.history[op.ID] = kv.pairs[op.Key] case "Put": kv.pairs[op.Key] = op.Value kv.history[op.ID] = "" case "Append": kv.pairs[op.Key] += op.Value kv.history[op.ID] = "" } kv.mu.Unlock() } }}
另起一协程不断接收命令,和 lab2 类似,服务器执行命令前首先检查是否已完成过该命令,避免重复执行。
Part B: Key/value service with snapshots
Part B 要求我们为 kvserver 加入快照。没太多好说的,感觉难度不应该标 hard。
为了避免延迟后的 RPC 造成重复的日志应用,删除 history 中的记录时另开一个协程,延迟一段时间再进行删除。
在重启之后也要避免重复的日志应用,所以需要持久化的有存储状态、应用历史以及已应用索引。
具体实现
func (kv *KVServer) MakeSnapshot(persister *raft.Persister, maxraftstate int) { for !kv.killed() { kv.mu.Lock() kv.cond.Wait() if persister.RaftStateSize() > maxraftstate*4/5 { w := new(bytes.Buffer) e := labgob.NewEncoder(w) e.Encode(kv.pairs) e.Encode(kv.history) e.Encode(kv.lastIndex) snapshot := w.Bytes() kv.rf.Snapshot(kv.lastIndex, snapshot) } kv.mu.Unlock() }}func (kv *KVServer) apply() { for !kv.killed() { applyMsg := <-kv.applyCh if applyMsg.CommandValid { op, _ := applyMsg.Command.(Op) kv.mu.Lock() kv.lastIndex = applyMsg.CommandIndex if _, ok := kv.history[op.ID]; ok { kv.mu.Unlock() continue } switch op.Operation { case "Get": kv.history[op.ID] = "1" kv.history[op.LastOpID] = "2" case "Put": kv.pairs[op.Key] = op.Value kv.history[op.ID] = "1" kv.history[op.LastOpID] = "2" case "Append": kv.pairs[op.Key] += op.Value kv.history[op.ID] = "1" kv.history[op.LastOpID] = "2" }
go func(id int64) { time.Sleep(100 * time.Millisecond) kv.mu.Lock() delete(kv.history, id) kv.mu.Unlock() }(op.LastOpID) kv.cond.Broadcast() kv.mu.Unlock() } else if applyMsg.SnapshotValid { kv.mu.Lock() r := bytes.NewBuffer(applyMsg.Snapshot) d := labgob.NewDecoder(r) d.Decode(&kv.pairs) d.Decode(&kv.history) d.Decode(&kv.lastIndex) for key, value := range kv.history { go func(k int64, v string) { time.Sleep(100 * time.Millisecond) if v == "2" { kv.mu.Lock() delete(kv.history, k) kv.mu.Unlock() } }(key, value)
} kv.mu.Unlock() } }}
在初始化函数中:
if persister.ReadSnapshot() != nil && len(persister.ReadSnapshot()) >= 1 { r := bytes.NewBuffer(persister.ReadSnapshot()) d := labgob.NewDecoder(r) d.Decode(&kv.pairs) d.Decode(&kv.history) d.Decode(&kv.lastIndex) kv.mu.Lock() for key, value := range kv.history { go func(k int64, v string) { time.Sleep(100 * time.Millisecond) if v == "2" { kv.mu.Lock() delete(kv.history, k) kv.mu.Unlock() } }(key, value) } kv.mu.Unlock() } if maxraftstate >= 0 { go kv.MakeSnapshot(persister, maxraftstate) }
单次测试结果
=== RUN TestBasic4ATest: one client (4A) ... ... Passed -- 15.1 5 10461 1973--- PASS: TestBasic4A (15.05s)=== RUN TestSpeed4ATest: ops complete fast enough (4A) ... ... Passed -- 5.9 3 3127 0--- PASS: TestSpeed4A (5.92s)=== RUN TestConcurrent4ATest: many clients (4A) ... ... Passed -- 15.2 5 14620 2792--- PASS: TestConcurrent4A (15.24s)=== RUN TestUnreliable4ATest: unreliable net, many clients (4A) ... ... Passed -- 17.7 5 4127 452--- PASS: TestUnreliable4A (17.70s)=== RUN TestUnreliableOneKey4ATest: concurrent append to same key, unreliable (4A) ... ... Passed -- 2.4 3 263 52--- PASS: TestUnreliableOneKey4A (2.39s)=== RUN TestOnePartition4ATest: progress in majority (4A) ... ... Passed -- 0.8 5 65 2Test: no progress in minority (4A) ... ... Passed -- 1.1 5 156 3Test: completion after heal (4A) ... ... Passed -- 1.0 5 67 3--- PASS: TestOnePartition4A (3.50s)=== RUN TestManyPartitionsOneClient4ATest: partitions, one client (4A) ... ... Passed -- 22.4 5 4920 720--- PASS: TestManyPartitionsOneClient4A (22.40s)=== RUN TestManyPartitionsManyClients4ATest: partitions, many clients (4A) ... ... Passed -- 22.5 5 9930 1466--- PASS: TestManyPartitionsManyClients4A (22.54s)=== RUN TestPersistOneClient4ATest: restarts, one client (4A) ... ... Passed -- 23.1 5 10499 1912--- PASS: TestPersistOneClient4A (23.08s)=== RUN TestPersistConcurrent4ATest: restarts, many clients (4A) ... ... Passed -- 25.1 5 16316 2862--- PASS: TestPersistConcurrent4A (25.06s)=== RUN TestPersistConcurrentUnreliable4ATest: unreliable net, restarts, many clients (4A) ... ... Passed -- 22.9 5 4720 468--- PASS: TestPersistConcurrentUnreliable4A (22.86s)=== RUN TestPersistPartition4ATest: restarts, partitions, many clients (4A) ... ... Passed -- 29.9 5 9971 1417--- PASS: TestPersistPartition4A (29.88s)=== RUN TestPersistPartitionUnreliable4ATest: unreliable net, restarts, partitions, many clients (4A) ... ... Passed -- 28.5 5 4407 297--- PASS: TestPersistPartitionUnreliable4A (28.47s)=== RUN TestPersistPartitionUnreliableLinearizable4ATest: unreliable net, restarts, partitions, random keys, many clients (4A) ... ... Passed -- 32.2 7 11361 444--- PASS: TestPersistPartitionUnreliableLinearizable4A (32.22s)=== RUN TestSnapshotRPC4BTest: InstallSnapshot RPC (4B) ...labgob warning: Decoding into a non-default variable/field int may not work ... Passed -- 5.0 3 325 63--- PASS: TestSnapshotRPC4B (4.96s)=== RUN TestSnapshotSize4BTest: snapshot size is reasonable (4B) ... ... Passed -- 3.3 3 2467 800--- PASS: TestSnapshotSize4B (3.30s)=== RUN TestSpeed4BTest: ops complete fast enough (4B) ... ... Passed -- 4.1 3 3088 0--- PASS: TestSpeed4B (4.05s)=== RUN TestSnapshotRecover4BTest: restarts, snapshots, one client (4B) ... ... Passed -- 19.8 5 20631 3982--- PASS: TestSnapshotRecover4B (19.79s)=== RUN TestSnapshotRecoverManyClients4BTest: restarts, snapshots, many clients (4B) ... ... Passed -- 20.3 5 96102 18766--- PASS: TestSnapshotRecoverManyClients4B (20.29s)=== RUN TestSnapshotUnreliable4BTest: unreliable net, snapshots, many clients (4B) ... ... Passed -- 16.5 5 4103 462--- PASS: TestSnapshotUnreliable4B (16.48s)=== RUN TestSnapshotUnreliableRecover4BTest: unreliable net, restarts, snapshots, many clients (4B) ... ... Passed -- 22.3 5 4487 438--- PASS: TestSnapshotUnreliableRecover4B (22.27s)=== RUN TestSnapshotUnreliableRecoverConcurrentPartition4BTest: unreliable net, restarts, partitions, snapshots, many clients (4B) ... ... Passed -- 28.9 5 4402 271--- PASS: TestSnapshotUnreliableRecoverConcurrentPartition4B (28.91s)=== RUN TestSnapshotUnreliableRecoverConcurrentPartitionLinearizable4BTest: unreliable net, restarts, partitions, snapshots, random keys, many clients (4B) ... ... Passed -- 31.5 7 11331 426--- PASS: TestSnapshotUnreliableRecoverConcurrentPartitionLinearizable4B (31.53s)PASS
MIT 6.5840(原 6.824) 通关笔记 Lab4 Fault-tolerant Key/Value Service
https://yomi.moe/posts/mit65840-4/