Dream Blue
1710 字
9 分钟
MIT 6.5840(原 6.824) 通关笔记 Lab4 Fault-tolerant Key/Value Service
这次的 Lab 要求我们使用 Lab3 中实现的 raft 构建一个 kvserver。
Part A: Key/value service without snapshots
kvserver
Clerk 向记录的 leader kvserver 发送请求,如果超时或失败,则重新寻找 leader 并发送请求。
kvserver 收到请求后调用 Start(),等待 Raft 完成共识。提交的命令完成后回复 Clerk 的 RPC。
kvserver 侧如何得知结果被应用?
不妨利用 Lab 2 中用到的历史记录。在每次命令被提交并执行后将 history 设置,RPC 处理部分不断检测对应项,当对应项不为空时则判断为命令执行完成。
每个新操作的 RPC 请求都会附带上一操作的序号,表示客户端已经收到上次操作的结果。剩下的就交给 Raft 吧。
完成以后初次测试只有第二个 test case 出现了超时错误
test_test.go:419: Operations completed too slowly 100.859572ms/op > 33.333333ms/op
测试要求每 100ms 至少进行三次操作,如果按我在 Lab 3 中的固定每 100ms 发送一次 AppendEntries 的做法肯定是过不了的,所以又得回去改 Raft(
func (rf *Raft) Start(command interface{}) (int, int, bool) {
index := -1
term := -1
isLeader := true
// Your code here (3B).
rf.mu.Lock()
defer rf.mu.Unlock()
term, isLeader = rf.CurrentTerm, rf.state == LEADER
if !isLeader {
return index, term, isLeader
} else {
index = len(rf.Log) + rf.SnapshotState.SnapshotIndex
rf.Log = append(rf.Log, Log{Command: command, Term: rf.CurrentTerm})
rf.persist()
rf.matchIndex[rf.me] = len(rf.Log) - 1 + rf.SnapshotState.SnapshotIndex
go func() {
time.Sleep(1 * time.Millisecond)
rf.mu.Lock()
if rf.state != LEADER {
rf.mu.Unlock()
return
}
rf.sendAppendEntriesToAll()
rf.mu.Unlock()
}()
}
return index, term, isLeader
}
具体实现
client.go
func MakeClerk(servers []*labrpc.ClientEnd) *Clerk {
ck := new(Clerk)
ck.servers = servers
// You'll have to add code here.
ck.leader = mathRand.Intn(len(servers))
ck.lastOpID = 0
return ck
}
func (ck *Clerk) Get(key string) string {
// You will have to modify this function.
opId := nrand()
args := GetArgs{Key: key, ID: opId, LastOpID: ck.lastOpID}
start := ck.leader
for {
reply := GetReply{}
resultCh := make(chan bool)
leader := ck.leader
var ok bool
go func() {
time.Sleep(100 * time.Millisecond)
resultCh <- false
}()
go func() {
ok = ck.servers[leader].Call("KVServer.Get", &args, &reply)
resultCh <- true
}()
if <-resultCh {
if ok && reply.Err == "" {
ck.lastOpID = opId
return reply.Value
}
}
ck.leader = (ck.leader + 1) % len(ck.servers)
if ck.leader == start {
time.Sleep(200 * time.Millisecond)
}
}
}
func (ck *Clerk) PutAppend(key string, value string, op string) {
// You will have to modify this function.
opId := nrand()
args := PutAppendArgs{Key: key, Value: value, ID: opId, LastOpID: ck.lastOpID}
start := ck.leader
for {
resultCh := make(chan bool)
var ok bool
leader := ck.leader
reply := PutAppendReply{}
go func() {
ok = ck.servers[leader].Call("KVServer."+op, &args, &reply)
resultCh <- true
}()
go func() {
time.Sleep(100 * time.Millisecond)
resultCh <- false
}()
if <-resultCh {
if ok && reply.Err == "" {
ck.lastOpID = opId
break
}
}
ck.leader = (ck.leader + 1) % len(ck.servers)
if ck.leader == start {
time.Sleep(200 * time.Millisecond)
}
}
}
初始化时随机给 Clerk 分配一个假 leader,当请求失败后再更换节点尝试,全部节点都失败后等待一会儿再开始新一轮的重试。
server.go
func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
// Your code here.
op := Op{Operation: "Get", Key: args.Key, ID: args.ID, LastOpID: args.LastOpID}
_, _, success := kv.rf.Start(op)
if !success {
reply.Err = "not leader"
} else {
for !kv.killed() {
if _, isLeader := kv.rf.GetState(); !isLeader {
reply.Err = "leader expired"
return
}
kv.mu.Lock()
if _, ok := kv.history[op.ID]; ok {
reply.Value = kv.pairs[op.Key]
kv.mu.Unlock()
return
}
kv.mu.Unlock()
time.Sleep(1 * time.Millisecond)
}
}
}
func (kv *KVServer) apply() {
for !kv.killed() {
applyMsg := <-kv.applyCh
if applyMsg.CommandValid {
op, _ := applyMsg.Command.(Op)
kv.mu.Lock()
if _, ok := kv.history[op.ID]; ok {
kv.mu.Unlock()
continue
}
delete(kv.history, op.LastOpID)
switch op.Operation {
case "Get":
kv.history[op.ID] = kv.pairs[op.Key]
case "Put":
kv.pairs[op.Key] = op.Value
kv.history[op.ID] = ""
case "Append":
kv.pairs[op.Key] += op.Value
kv.history[op.ID] = ""
}
kv.mu.Unlock()
}
}
}
另起一协程不断接收命令,和 lab2 类似,服务器执行命令前首先检查是否已完成过该命令,避免重复执行。
Part B: Key/value service with snapshots
Part B 要求我们为 kvserver 加入快照。没太多好说的,感觉难度不应该标 hard。
为了避免延迟后的 RPC 造成重复的日志应用,删除 history 中的记录时另开一个协程,延迟一段时间再进行删除。
在重启之后也要避免重复的日志应用,所以需要持久化的有存储状态、应用历史以及已应用索引。
具体实现
func (kv *KVServer) MakeSnapshot(persister *raft.Persister, maxraftstate int) {
for !kv.killed() {
kv.mu.Lock()
kv.cond.Wait()
if persister.RaftStateSize() > maxraftstate*4/5 {
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(kv.pairs)
e.Encode(kv.history)
e.Encode(kv.lastIndex)
snapshot := w.Bytes()
kv.rf.Snapshot(kv.lastIndex, snapshot)
}
kv.mu.Unlock()
}
}
func (kv *KVServer) apply() {
for !kv.killed() {
applyMsg := <-kv.applyCh
if applyMsg.CommandValid {
op, _ := applyMsg.Command.(Op)
kv.mu.Lock()
kv.lastIndex = applyMsg.CommandIndex
if _, ok := kv.history[op.ID]; ok {
kv.mu.Unlock()
continue
}
switch op.Operation {
case "Get":
kv.history[op.ID] = "1"
kv.history[op.LastOpID] = "2"
case "Put":
kv.pairs[op.Key] = op.Value
kv.history[op.ID] = "1"
kv.history[op.LastOpID] = "2"
case "Append":
kv.pairs[op.Key] += op.Value
kv.history[op.ID] = "1"
kv.history[op.LastOpID] = "2"
}
go func(id int64) {
time.Sleep(100 * time.Millisecond)
kv.mu.Lock()
delete(kv.history, id)
kv.mu.Unlock()
}(op.LastOpID)
kv.cond.Broadcast()
kv.mu.Unlock()
} else if applyMsg.SnapshotValid {
kv.mu.Lock()
r := bytes.NewBuffer(applyMsg.Snapshot)
d := labgob.NewDecoder(r)
d.Decode(&kv.pairs)
d.Decode(&kv.history)
d.Decode(&kv.lastIndex)
for key, value := range kv.history {
go func(k int64, v string) {
time.Sleep(100 * time.Millisecond)
if v == "2" {
kv.mu.Lock()
delete(kv.history, k)
kv.mu.Unlock()
}
}(key, value)
}
kv.mu.Unlock()
}
}
}
在初始化函数中:
if persister.ReadSnapshot() != nil && len(persister.ReadSnapshot()) >= 1 {
r := bytes.NewBuffer(persister.ReadSnapshot())
d := labgob.NewDecoder(r)
d.Decode(&kv.pairs)
d.Decode(&kv.history)
d.Decode(&kv.lastIndex)
kv.mu.Lock()
for key, value := range kv.history {
go func(k int64, v string) {
time.Sleep(100 * time.Millisecond)
if v == "2" {
kv.mu.Lock()
delete(kv.history, k)
kv.mu.Unlock()
}
}(key, value)
}
kv.mu.Unlock()
}
if maxraftstate >= 0 {
go kv.MakeSnapshot(persister, maxraftstate)
}
单次测试结果
=== RUN TestBasic4A
Test: one client (4A) ...
... Passed -- 15.1 5 10461 1973
--- PASS: TestBasic4A (15.05s)
=== RUN TestSpeed4A
Test: ops complete fast enough (4A) ...
... Passed -- 5.9 3 3127 0
--- PASS: TestSpeed4A (5.92s)
=== RUN TestConcurrent4A
Test: many clients (4A) ...
... Passed -- 15.2 5 14620 2792
--- PASS: TestConcurrent4A (15.24s)
=== RUN TestUnreliable4A
Test: unreliable net, many clients (4A) ...
... Passed -- 17.7 5 4127 452
--- PASS: TestUnreliable4A (17.70s)
=== RUN TestUnreliableOneKey4A
Test: concurrent append to same key, unreliable (4A) ...
... Passed -- 2.4 3 263 52
--- PASS: TestUnreliableOneKey4A (2.39s)
=== RUN TestOnePartition4A
Test: progress in majority (4A) ...
... Passed -- 0.8 5 65 2
Test: no progress in minority (4A) ...
... Passed -- 1.1 5 156 3
Test: completion after heal (4A) ...
... Passed -- 1.0 5 67 3
--- PASS: TestOnePartition4A (3.50s)
=== RUN TestManyPartitionsOneClient4A
Test: partitions, one client (4A) ...
... Passed -- 22.4 5 4920 720
--- PASS: TestManyPartitionsOneClient4A (22.40s)
=== RUN TestManyPartitionsManyClients4A
Test: partitions, many clients (4A) ...
... Passed -- 22.5 5 9930 1466
--- PASS: TestManyPartitionsManyClients4A (22.54s)
=== RUN TestPersistOneClient4A
Test: restarts, one client (4A) ...
... Passed -- 23.1 5 10499 1912
--- PASS: TestPersistOneClient4A (23.08s)
=== RUN TestPersistConcurrent4A
Test: restarts, many clients (4A) ...
... Passed -- 25.1 5 16316 2862
--- PASS: TestPersistConcurrent4A (25.06s)
=== RUN TestPersistConcurrentUnreliable4A
Test: unreliable net, restarts, many clients (4A) ...
... Passed -- 22.9 5 4720 468
--- PASS: TestPersistConcurrentUnreliable4A (22.86s)
=== RUN TestPersistPartition4A
Test: restarts, partitions, many clients (4A) ...
... Passed -- 29.9 5 9971 1417
--- PASS: TestPersistPartition4A (29.88s)
=== RUN TestPersistPartitionUnreliable4A
Test: unreliable net, restarts, partitions, many clients (4A) ...
... Passed -- 28.5 5 4407 297
--- PASS: TestPersistPartitionUnreliable4A (28.47s)
=== RUN TestPersistPartitionUnreliableLinearizable4A
Test: unreliable net, restarts, partitions, random keys, many clients (4A) ...
... Passed -- 32.2 7 11361 444
--- PASS: TestPersistPartitionUnreliableLinearizable4A (32.22s)
=== RUN TestSnapshotRPC4B
Test: InstallSnapshot RPC (4B) ...
labgob warning: Decoding into a non-default variable/field int may not work
... Passed -- 5.0 3 325 63
--- PASS: TestSnapshotRPC4B (4.96s)
=== RUN TestSnapshotSize4B
Test: snapshot size is reasonable (4B) ...
... Passed -- 3.3 3 2467 800
--- PASS: TestSnapshotSize4B (3.30s)
=== RUN TestSpeed4B
Test: ops complete fast enough (4B) ...
... Passed -- 4.1 3 3088 0
--- PASS: TestSpeed4B (4.05s)
=== RUN TestSnapshotRecover4B
Test: restarts, snapshots, one client (4B) ...
... Passed -- 19.8 5 20631 3982
--- PASS: TestSnapshotRecover4B (19.79s)
=== RUN TestSnapshotRecoverManyClients4B
Test: restarts, snapshots, many clients (4B) ...
... Passed -- 20.3 5 96102 18766
--- PASS: TestSnapshotRecoverManyClients4B (20.29s)
=== RUN TestSnapshotUnreliable4B
Test: unreliable net, snapshots, many clients (4B) ...
... Passed -- 16.5 5 4103 462
--- PASS: TestSnapshotUnreliable4B (16.48s)
=== RUN TestSnapshotUnreliableRecover4B
Test: unreliable net, restarts, snapshots, many clients (4B) ...
... Passed -- 22.3 5 4487 438
--- PASS: TestSnapshotUnreliableRecover4B (22.27s)
=== RUN TestSnapshotUnreliableRecoverConcurrentPartition4B
Test: unreliable net, restarts, partitions, snapshots, many clients (4B) ...
... Passed -- 28.9 5 4402 271
--- PASS: TestSnapshotUnreliableRecoverConcurrentPartition4B (28.91s)
=== RUN TestSnapshotUnreliableRecoverConcurrentPartitionLinearizable4B
Test: unreliable net, restarts, partitions, snapshots, random keys, many clients (4B) ...
... Passed -- 31.5 7 11331 426
--- PASS: TestSnapshotUnreliableRecoverConcurrentPartitionLinearizable4B (31.53s)
PASS
MIT 6.5840(原 6.824) 通关笔记 Lab4 Fault-tolerant Key/Value Service
https://yomi.moe/posts/mit65840-4/