1710 字
9 分钟
MIT 6.5840(原 6.824) 通关笔记 Lab4 Fault-tolerant Key/Value Service

这次的 Lab 要求我们使用 Lab3 中实现的 raft 构建一个 kvserver。

Part A: Key/value service without snapshots#

kvserver#

Clerk 向记录的 leader kvserver 发送请求,如果超时或失败,则重新寻找 leader 并发送请求。

kvserver 收到请求后调用 Start(),等待 Raft 完成共识。提交的命令完成后回复 Clerk 的 RPC。

kvserver 侧如何得知结果被应用?

不妨利用 Lab 2 中用到的历史记录。在每次命令被提交并执行后将 history 设置,RPC 处理部分不断检测对应项,当对应项不为空时则判断为命令执行完成。

每个新操作的 RPC 请求都会附带上一操作的序号,表示客户端已经收到上次操作的结果。剩下的就交给 Raft 吧。

完成以后初次测试只有第二个 test case 出现了超时错误

test_test.go:419: Operations completed too slowly 100.859572ms/op > 33.333333ms/op

测试要求每 100ms 至少进行三次操作,如果按我在 Lab 3 中的固定每 100ms 发送一次 AppendEntries 的做法肯定是过不了的,所以又得回去改 Raft(

func (rf *Raft) Start(command interface{}) (int, int, bool) {
	index := -1
	term := -1
	isLeader := true

	// Your code here (3B).
	rf.mu.Lock()
	defer rf.mu.Unlock()
	term, isLeader = rf.CurrentTerm, rf.state == LEADER
	if !isLeader {
		return index, term, isLeader
	} else {
		index = len(rf.Log) + rf.SnapshotState.SnapshotIndex
		rf.Log = append(rf.Log, Log{Command: command, Term: rf.CurrentTerm})
		rf.persist()
		rf.matchIndex[rf.me] = len(rf.Log) - 1 + rf.SnapshotState.SnapshotIndex
		go func() {
			time.Sleep(1 * time.Millisecond)
			rf.mu.Lock()
			if rf.state != LEADER {
				rf.mu.Unlock()
				return
			}
			rf.sendAppendEntriesToAll()
			rf.mu.Unlock()
		}()
	}
	return index, term, isLeader
}

具体实现#

client.go#

func MakeClerk(servers []*labrpc.ClientEnd) *Clerk {
	ck := new(Clerk)
	ck.servers = servers
	// You'll have to add code here.
	ck.leader = mathRand.Intn(len(servers))
	ck.lastOpID = 0
	return ck
}
func (ck *Clerk) Get(key string) string {
	// You will have to modify this function.
	opId := nrand()
	args := GetArgs{Key: key, ID: opId, LastOpID: ck.lastOpID}
	start := ck.leader
	for {
		reply := GetReply{}
		resultCh := make(chan bool)
		leader := ck.leader
		var ok bool
		go func() {
			time.Sleep(100 * time.Millisecond)
			resultCh <- false
		}()
		go func() {
			ok = ck.servers[leader].Call("KVServer.Get", &args, &reply)
			resultCh <- true
		}()
		if <-resultCh {
			if ok && reply.Err == "" {
				ck.lastOpID = opId
				return reply.Value
			}
		}
		ck.leader = (ck.leader + 1) % len(ck.servers)
		if ck.leader == start {
			time.Sleep(200 * time.Millisecond)
		}
	}
}
func (ck *Clerk) PutAppend(key string, value string, op string) {
	// You will have to modify this function.
	opId := nrand()
	args := PutAppendArgs{Key: key, Value: value, ID: opId, LastOpID: ck.lastOpID}
	start := ck.leader
	for {
		resultCh := make(chan bool)
		var ok bool
		leader := ck.leader
		reply := PutAppendReply{}
		go func() {
			ok = ck.servers[leader].Call("KVServer."+op, &args, &reply)
			resultCh <- true
		}()
		go func() {
			time.Sleep(100 * time.Millisecond)
			resultCh <- false
		}()
		if <-resultCh {
			if ok && reply.Err == "" {
				ck.lastOpID = opId
				break
			}
		}
		ck.leader = (ck.leader + 1) % len(ck.servers)
		if ck.leader == start {
			time.Sleep(200 * time.Millisecond)
		}
	}
}

初始化时随机给 Clerk 分配一个假 leader,当请求失败后再更换节点尝试,全部节点都失败后等待一会儿再开始新一轮的重试。

server.go#

func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
	// Your code here.
	op := Op{Operation: "Get", Key: args.Key, ID: args.ID, LastOpID: args.LastOpID}
	_, _, success := kv.rf.Start(op)
	if !success {
		reply.Err = "not leader"
	} else {
		for !kv.killed() {
			if _, isLeader := kv.rf.GetState(); !isLeader {
				reply.Err = "leader expired"
				return
			}
			kv.mu.Lock()
			if _, ok := kv.history[op.ID]; ok {
				reply.Value = kv.pairs[op.Key]
				kv.mu.Unlock()
				return
			}
			kv.mu.Unlock()
			time.Sleep(1 * time.Millisecond)
		}
	}
}
func (kv *KVServer) apply() {
	for !kv.killed() {
		applyMsg := <-kv.applyCh
		if applyMsg.CommandValid {
			op, _ := applyMsg.Command.(Op)
			kv.mu.Lock()
			if _, ok := kv.history[op.ID]; ok {
				kv.mu.Unlock()
				continue
			}
			delete(kv.history, op.LastOpID)

			switch op.Operation {
			case "Get":
				kv.history[op.ID] = kv.pairs[op.Key]
			case "Put":
				kv.pairs[op.Key] = op.Value
				kv.history[op.ID] = ""
			case "Append":
				kv.pairs[op.Key] += op.Value
				kv.history[op.ID] = ""
			}
			kv.mu.Unlock()
		}
	}
}

另起一协程不断接收命令,和 lab2 类似,服务器执行命令前首先检查是否已完成过该命令,避免重复执行。

Part B: Key/value service with snapshots#

Part B 要求我们为 kvserver 加入快照。没太多好说的,感觉难度不应该标 hard。

为了避免延迟后的 RPC 造成重复的日志应用,删除 history 中的记录时另开一个协程,延迟一段时间再进行删除。

在重启之后也要避免重复的日志应用,所以需要持久化的有存储状态、应用历史以及已应用索引。

具体实现#

func (kv *KVServer) MakeSnapshot(persister *raft.Persister, maxraftstate int) {
	for !kv.killed() {
		kv.mu.Lock()
		kv.cond.Wait()
		if persister.RaftStateSize() > maxraftstate*4/5 {
			w := new(bytes.Buffer)
			e := labgob.NewEncoder(w)
			e.Encode(kv.pairs)
			e.Encode(kv.history)
			e.Encode(kv.lastIndex)
			snapshot := w.Bytes()
			kv.rf.Snapshot(kv.lastIndex, snapshot)
		}
		kv.mu.Unlock()
	}
}
func (kv *KVServer) apply() {
	for !kv.killed() {
		applyMsg := <-kv.applyCh
		if applyMsg.CommandValid {
			op, _ := applyMsg.Command.(Op)
			kv.mu.Lock()
			kv.lastIndex = applyMsg.CommandIndex
			if _, ok := kv.history[op.ID]; ok {
				kv.mu.Unlock()
				continue
			}
			switch op.Operation {
			case "Get":
				kv.history[op.ID] = "1"
				kv.history[op.LastOpID] = "2"
			case "Put":
				kv.pairs[op.Key] = op.Value
				kv.history[op.ID] = "1"
				kv.history[op.LastOpID] = "2"
			case "Append":
				kv.pairs[op.Key] += op.Value
				kv.history[op.ID] = "1"
				kv.history[op.LastOpID] = "2"
			}

			go func(id int64) {
				time.Sleep(100 * time.Millisecond)
				kv.mu.Lock()
				delete(kv.history, id)
				kv.mu.Unlock()
			}(op.LastOpID)
			kv.cond.Broadcast()
			kv.mu.Unlock()
		} else if applyMsg.SnapshotValid {
			kv.mu.Lock()
			r := bytes.NewBuffer(applyMsg.Snapshot)
			d := labgob.NewDecoder(r)
			d.Decode(&kv.pairs)
			d.Decode(&kv.history)
			d.Decode(&kv.lastIndex)
			for key, value := range kv.history {
				go func(k int64, v string) {
					time.Sleep(100 * time.Millisecond)
					if v == "2" {
						kv.mu.Lock()
						delete(kv.history, k)
						kv.mu.Unlock()
					}
				}(key, value)

			}
			kv.mu.Unlock()
		}
	}
}

在初始化函数中:

if persister.ReadSnapshot() != nil && len(persister.ReadSnapshot()) >= 1 {
		r := bytes.NewBuffer(persister.ReadSnapshot())
		d := labgob.NewDecoder(r)
		d.Decode(&kv.pairs)
		d.Decode(&kv.history)
		d.Decode(&kv.lastIndex)
		kv.mu.Lock()
		for key, value := range kv.history {
			go func(k int64, v string) {
				time.Sleep(100 * time.Millisecond)
				if v == "2" {
					kv.mu.Lock()
					delete(kv.history, k)
					kv.mu.Unlock()
				}
			}(key, value)
		}
		kv.mu.Unlock()
	}
	if maxraftstate >= 0 {
		go kv.MakeSnapshot(persister, maxraftstate)
	}

单次测试结果#

=== RUN   TestBasic4A
Test: one client (4A) ...
  ... Passed --  15.1  5 10461 1973
--- PASS: TestBasic4A (15.05s)
=== RUN   TestSpeed4A
Test: ops complete fast enough (4A) ...
  ... Passed --   5.9  3  3127    0
--- PASS: TestSpeed4A (5.92s)
=== RUN   TestConcurrent4A
Test: many clients (4A) ...
  ... Passed --  15.2  5 14620 2792
--- PASS: TestConcurrent4A (15.24s)
=== RUN   TestUnreliable4A
Test: unreliable net, many clients (4A) ...
  ... Passed --  17.7  5  4127  452
--- PASS: TestUnreliable4A (17.70s)
=== RUN   TestUnreliableOneKey4A
Test: concurrent append to same key, unreliable (4A) ...
  ... Passed --   2.4  3   263   52
--- PASS: TestUnreliableOneKey4A (2.39s)
=== RUN   TestOnePartition4A
Test: progress in majority (4A) ...
  ... Passed --   0.8  5    65    2
Test: no progress in minority (4A) ...
  ... Passed --   1.1  5   156    3
Test: completion after heal (4A) ...
  ... Passed --   1.0  5    67    3
--- PASS: TestOnePartition4A (3.50s)
=== RUN   TestManyPartitionsOneClient4A
Test: partitions, one client (4A) ...
  ... Passed --  22.4  5  4920  720
--- PASS: TestManyPartitionsOneClient4A (22.40s)
=== RUN   TestManyPartitionsManyClients4A
Test: partitions, many clients (4A) ...
  ... Passed --  22.5  5  9930 1466
--- PASS: TestManyPartitionsManyClients4A (22.54s)
=== RUN   TestPersistOneClient4A
Test: restarts, one client (4A) ...
  ... Passed --  23.1  5 10499 1912
--- PASS: TestPersistOneClient4A (23.08s)
=== RUN   TestPersistConcurrent4A
Test: restarts, many clients (4A) ...
  ... Passed --  25.1  5 16316 2862
--- PASS: TestPersistConcurrent4A (25.06s)
=== RUN   TestPersistConcurrentUnreliable4A
Test: unreliable net, restarts, many clients (4A) ...
  ... Passed --  22.9  5  4720  468
--- PASS: TestPersistConcurrentUnreliable4A (22.86s)
=== RUN   TestPersistPartition4A
Test: restarts, partitions, many clients (4A) ...
  ... Passed --  29.9  5  9971 1417
--- PASS: TestPersistPartition4A (29.88s)
=== RUN   TestPersistPartitionUnreliable4A
Test: unreliable net, restarts, partitions, many clients (4A) ...
  ... Passed --  28.5  5  4407  297
--- PASS: TestPersistPartitionUnreliable4A (28.47s)
=== RUN   TestPersistPartitionUnreliableLinearizable4A
Test: unreliable net, restarts, partitions, random keys, many clients (4A) ...
  ... Passed --  32.2  7 11361  444
--- PASS: TestPersistPartitionUnreliableLinearizable4A (32.22s)
=== RUN   TestSnapshotRPC4B
Test: InstallSnapshot RPC (4B) ...
labgob warning: Decoding into a non-default variable/field int may not work
  ... Passed --   5.0  3   325   63
--- PASS: TestSnapshotRPC4B (4.96s)
=== RUN   TestSnapshotSize4B
Test: snapshot size is reasonable (4B) ...
  ... Passed --   3.3  3  2467  800
--- PASS: TestSnapshotSize4B (3.30s)
=== RUN   TestSpeed4B
Test: ops complete fast enough (4B) ...
  ... Passed --   4.1  3  3088    0
--- PASS: TestSpeed4B (4.05s)
=== RUN   TestSnapshotRecover4B
Test: restarts, snapshots, one client (4B) ...
  ... Passed --  19.8  5 20631 3982
--- PASS: TestSnapshotRecover4B (19.79s)
=== RUN   TestSnapshotRecoverManyClients4B
Test: restarts, snapshots, many clients (4B) ...
  ... Passed --  20.3  5 96102 18766
--- PASS: TestSnapshotRecoverManyClients4B (20.29s)
=== RUN   TestSnapshotUnreliable4B
Test: unreliable net, snapshots, many clients (4B) ...
  ... Passed --  16.5  5  4103  462
--- PASS: TestSnapshotUnreliable4B (16.48s)
=== RUN   TestSnapshotUnreliableRecover4B
Test: unreliable net, restarts, snapshots, many clients (4B) ...
  ... Passed --  22.3  5  4487  438
--- PASS: TestSnapshotUnreliableRecover4B (22.27s)
=== RUN   TestSnapshotUnreliableRecoverConcurrentPartition4B
Test: unreliable net, restarts, partitions, snapshots, many clients (4B) ...
  ... Passed --  28.9  5  4402  271
--- PASS: TestSnapshotUnreliableRecoverConcurrentPartition4B (28.91s)
=== RUN   TestSnapshotUnreliableRecoverConcurrentPartitionLinearizable4B
Test: unreliable net, restarts, partitions, snapshots, random keys, many clients (4B) ...
  ... Passed --  31.5  7 11331  426
--- PASS: TestSnapshotUnreliableRecoverConcurrentPartitionLinearizable4B (31.53s)
PASS
MIT 6.5840(原 6.824) 通关笔记 Lab4 Fault-tolerant Key/Value Service
https://yomi.moe/posts/mit65840-4/
作者
藍々
发布于
2024-12-07
许可协议
CC BY-NC-SA 4.0