最后的 Lab 要求我们实现一个分片 k/v 存储系统。
Part A: The Controller and Static Sharding
大概的框架直接照抄 lab4 就可以,麻烦的点在于如何平均分配切片并尽可能减少移动。
注意 golang 的 map 存储的是引用,所以新建 configuration 时要新建一个 map。
另外 golang 的 map 也是哈希的,所以 for k, v := range 遍历 map 会导致各节点的分片不同,要保证每次遍历 map 都是按照一定顺序的。
切片的分配
首先分析一下:平均分配意味着每组会分配到 shardNum := len(conf.Shards) / len(conf.Groups) 个分片,而其中 toleranceNum := len(conf.Shards) % len(conf.Groups) 个组会多分配到一个分片。
而为了尽可能减少分片的移动,可以将分片数大于 shardNum 的节点多出来的分片以及 0 的分片标记为 待移出
,而分片数少于 shardNum 的节点则标记缺少量个数的待补充
。当 toleranceNum > 0 时可以“容忍” toleranceNum 个节点多一个分片。
标记完成后如果容忍的节点数仍小于 toleranceNum,则让差值数量的待补充
节点额外承担切片。
最后将 待移出
的分片分配到待补充
节点即可。
代码实现如下:
func reorder(conf *Config) {
if len(conf.Groups) == 0 {
return
}
shardNum := len(conf.Shards) / len(conf.Groups)
remainNum := len(conf.Shards) % len(conf.Groups)
shardSingal := make(map[int]int)
gidRemove := make([]int, 0, 5)
gidAdd := make([]int, 0, 5)
for key := range conf.Groups {
shardSingal[key] = shardNum
}
shardSingal[0] = 0
toleranceNum := remainNum
for _, val := range conf.Shards {
if _, ok := shardSingal[val]; ok {
shardSingal[val]--
}
}
keysSorted := make([]int, 0, len(shardSingal))
for key := range shardSingal {
keysSorted = append(keysSorted, key)
}
sort.Ints(keysSorted)
for _, key := range keysSorted {
for shardSingal[key] < 0 {
if shardSingal[key] == -1 && toleranceNum > 0 && key != 0 {
toleranceNum--
break
}
gidRemove = append(gidRemove, key)
shardSingal[key]++
}
for shardSingal[key] > 0 {
gidAdd = append(gidAdd, key)
shardSingal[key]--
}
}
if shardNum == 0 && toleranceNum > 0 {
for _, key := range keysSorted {
if toleranceNum <= 0 {
break
}
if key != 0 && shardSingal[key] == 0 {
gidAdd = append(gidAdd, key)
toleranceNum--
}
}
}
end := len(gidAdd)
for ptr := 0; ptr < end && toleranceNum > 0; ptr++ {
if ptr+1 < end && gidAdd[ptr] == gidAdd[ptr+1] {
continue
}
gidAdd = append(gidAdd, gidAdd[ptr])
toleranceNum--
}
ptr := 0
for i := 0; i < len(conf.Shards) && ptr < len(gidRemove); i++ {
if conf.Shards[i] == gidRemove[ptr] {
conf.Shards[i] = gidAdd[ptr]
ptr++
}
}
}
具体实现
都做到这里了实现这些东西肯定是很轻松的,没什么好讲的。
func (sc *ShardCtrler) apply() {
for {
applyMsg := <-sc.applyCh
if applyMsg.CommandValid {
op, _ := applyMsg.Command.(Op)
sc.mu.Lock()
if _, ok := sc.history[op.ID]; ok {
sc.mu.Unlock()
continue
}
switch op.Operation {
case "Join":
lastConfig := sc.configs[len(sc.configs)-1]
lastConfig.Groups = map[int][]string{}
for k, v := range sc.configs[len(sc.configs)-1].Groups {
lastConfig.Groups[k] = v
}
for k, v := range op.Servers {
lastConfig.Groups[k] = v
}
lastConfig.Num = len(sc.configs)
reorder(&lastConfig)
sc.configs = append(sc.configs, lastConfig)
sc.lastApplied = "Join"
sc.history[op.ID] = "1"
case "Leave":
lastConfig := sc.configs[len(sc.configs)-1]
lastConfig.Num = len(sc.configs)
lastConfig.Groups = map[int][]string{}
for k, v := range sc.configs[len(sc.configs)-1].Groups {
lastConfig.Groups[k] = v
}
for _, val := range op.GIDs {
for i, v := range lastConfig.Shards {
if v == val {
lastConfig.Shards[i] = 0
}
}
delete(lastConfig.Groups, val)
}
reorder(&lastConfig)
sc.configs = append(sc.configs, lastConfig)
sc.lastApplied = "Leave"
sc.history[op.ID] = "1"
case "Move":
if !(sc.lastApplied == "Join" || sc.lastApplied == "Leave") {
sc.lastApplied = "Move"
lastConfig := sc.configs[len(sc.configs)-1]
lastConfig.Shards[op.Shard] = op.GID
lastConfig.Num = len(sc.configs)
sc.configs = append(sc.configs, lastConfig)
}
sc.history[op.ID] = "1"
case "Query":
sc.lastApplied = "Query"
sc.history[op.ID] = "1"
}
delete(sc.history, op.LastOpID)
sc.mu.Unlock()
}
}
}
Part B: Shard Movement
首先将 lab4 的代码 copy 过来,就可以 pass 第一个 test。
在每次接收请求后判断分片是否正确,不正确则直接返回,实现后即可 pass 第二个 test。
分片的迁移
采用向之前持有分片的组发送 RPC 请求。
获取分片时肯定是向每组的 leader 发送,但如果组内的每个 server 都发送 RPC 的话比较复杂,并且不好处理 challenge1 中的删除分片。思考了一下,应当让每组中只有 leader 不断检测,其他节点的作用单纯是在分布式系统保持数据一致性。
起一个协程不断查询是否有更新的 configuration,如果有更新且待接收的分片
为空时将其写入 raft 通知组内所有节点更新配置。应用新的配置时找出需要发送的分片
和待接收的分片
。
func (kv *ShardKV) poll() {
_, isLeader := kv.rf.GetState()
kv.mu.Lock()
if !isLeader || len(kv.newShard) > 0 {
kv.mu.Unlock()
return
}
next := kv.conf.Num + 1
kv.mu.Unlock()
conf := kv.mck.Query(next)
if conf.Num == next {
kv.rf.Start(conf)
}
}
func (kv *ShardKV) updateDataShard(conf shardctrler.Config) {
kv.mu.Lock()
defer kv.mu.Unlock()
if conf.Num <= kv.conf.Num {
return
}
lastConf, outShard := kv.conf, kv.authFlags
kv.authFlags, kv.conf = make(map[int]bool), conf
for shard, gid := range conf.Shards {
if gid != kv.gid {
continue
}
if _, ok := outShard[shard]; ok || lastConf.Num == 0 {
kv.authFlags[shard] = true
delete(outShard, shard)
} else {
kv.newShard[shard] = lastConf.Num
}
}
if len(outShard) > 0 {
kv.outShards[lastConf.Num] = make(map[int]map[string]string)
for shard := range outShard {
targetPairs := make(map[string]string)
for k, v := range kv.pairs {
if key2shard(k) == shard {
targetPairs[k] = v
}
}
kv.outShards[lastConf.Num][shard] = targetPairs
}
}
}
再起一个协程不断检测是否有待接收的分片
,若有则向对应组发送获取分片请求。
func (kv *ShardKV) getDataShard() {
for !kv.killed() {
_, isLeader := kv.rf.GetState()
kv.mu.Lock()
if !isLeader || len(kv.newShard) == 0 {
kv.mu.Unlock()
time.Sleep(50 * time.Millisecond)
continue
}
var wg sync.WaitGroup
for shard, num := range kv.newShard {
wg.Add(1)
go func(shard int, conf shardctrler.Config) {
kv.sendShardRequest(shard, conf)
wg.Done()
}(shard, kv.mck.Query(num))
}
kv.mu.Unlock()
wg.Wait()
time.Sleep(50 * time.Millisecond)
}
}
func (kv *ShardKV) sendShardRequest(shard int, conf shardctrler.Config) {
args := ShardArgs{Shard: shard, Num: conf.Num}
for _, srv := range conf.Groups[conf.Shards[shard]] {
srv := kv.make_end(srv)
reply := ShardReply{}
ok := srv.Call("ShardKV.ReceiveShardRequest", &args, &reply)
if ok && reply.Err == OK {
kv.rf.Start(ShardMove{Pairs: reply.AppendPairs, Sequence: reply.Sequence, Shard: shard, Num: conf.Num})
return
}
}
}
成功后将新分片数据写入 raft 共享给全组。
快照
因为要保证每次重启从上次快照点重新应用日志能够完全还原系统状态,像是待移出的切片等数据也要保存在快照中。
func (kv *ShardKV) MakeSnapshot(persister *raft.Persister, maxraftstate int) {
for !kv.killed() {
kv.mu.Lock()
kv.cond.Wait()
if persister.RaftStateSize() > maxraftstate*4/5 {
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(kv.pairs)
e.Encode(kv.sequence)
e.Encode(kv.newShard)
e.Encode(kv.outShards)
e.Encode(kv.authFlags)
e.Encode(kv.conf)
e.Encode(kv.garbages)
snapshot := w.Bytes()
kv.rf.Snapshot(kv.lastIndex, snapshot)
}
kv.mu.Unlock()
}
}
去重
因为之前一直采用存储操作历史记录的方式来去重,而分片迁移时无法特定对应切片的历史记录。所以需要更改历史记录为history map[int]map[int64]Result
,分切片存储历史记录。
在进行重复测试时,发现 TestConcurrent3_5B 中,有可能出现已经完成的指令在分片迁移之后才进行 RPC 的回复,这会导致 Get 无法及时回复有效的结果。
所有最后还是用通道回复结果(
这里参考了简书上的一篇博文1,为每个客户端分配 id 而非为操作分配 id,并让每个客户端记下每次请求的序号。server 侧存储每个客户端的最新已进行操作序号,序号小于记录的操作不再进行,以此达到去重。
type Clerk struct {
sm *shardctrler.Clerk
config shardctrler.Config
make_end func(string) *labrpc.ClientEnd
// You will have to modify this struct.
id int64
num int
}
func MakeClerk(ctrlers []*labrpc.ClientEnd, make_end func(string) *labrpc.ClientEnd) *Clerk {
ck := new(Clerk)
ck.sm = shardctrler.MakeClerk(ctrlers)
ck.make_end = make_end
// You'll have to add code here.
ck.id = nrand()
ck.num = 0
return ck
}
Challenge 1
Challenge 1 要求实现对无用内存的释放。
首先要明确我们可以删除什么样的数据。当一个节点向另一组发送分片后,如果能够确保该分片已被接受,则本地节点的分片数据可以不再保留。
func (kv *ShardKV) applyShardMove(data ShardMove) {
kv.mu.Lock()
defer kv.mu.Unlock()
if data.Num != kv.conf.Num-1 {
return
}
delete(kv.newShard, data.Shard)
if _, ok := kv.authFlags[data.Shard]; !ok {
kv.authFlags[data.Shard] = true
for k, v := range data.Pairs {
kv.pairs[k] = v
}
for k, v := range data.Sequence {
kv.sequence[k] = max(v, kv.sequence[k])
}
if _, ok := kv.garbages[data.Num]; !ok {
kv.garbages[data.Num] = make(map[int]bool)
}
kv.garbages[data.Num][data.Shard] = true
}
kv.cond.Broadcast()
}
也就是说要采取二阶段提交来确保正确的垃圾回收。
另起协程不断检测是否有垃圾需要进行处理,若有则向对应组发送 RPC 通知对方可以清除。
func (kv *ShardKV) requestGarbageCollection() {
_, isLeader := kv.rf.GetState()
kv.mu.Lock()
if !isLeader || len(kv.garbages) == 0 {
kv.mu.Unlock()
return
}
var wg sync.WaitGroup
for num, shards := range kv.garbages {
for shard := range shards {
wg.Add(1)
go func(shard int, cfg shardctrler.Config) {
defer wg.Done()
args := ShardArgs{Shard: shard, Num: cfg.Num}
gid := cfg.Shards[shard]
for _, server := range cfg.Groups[gid] {
srv := kv.make_end(server)
reply := ShardReply{}
if ok := srv.Call("ShardKV.GarbageCollection", &args, &reply); ok && reply.Err == OK {
kv.mu.Lock()
defer kv.mu.Unlock()
delete(kv.garbages[cfg.Num], shard)
if len(kv.garbages[cfg.Num]) == 0 {
delete(kv.garbages, cfg.Num)
}
}
}
}(shard, kv.mck.Query(num))
}
}
kv.mu.Unlock()
wg.Wait()
}
func (kv *ShardKV) GarbageCollection(args *ShardArgs, reply *ShardReply) {
reply.Err = ErrWrongLeader
kv.mu.Lock()
if _, ok := kv.outShards[args.Num]; !ok {
kv.mu.Unlock()
return
}
if _, ok := kv.outShards[args.Num][args.Shard]; !ok {
kv.mu.Unlock()
return
}
gc := GC{CfgNum: args.Num, Shard: args.Shard}
kv.mu.Unlock()
index, _, isLeader := kv.rf.Start(gc)
if isLeader {
ch := kv.getNotifyCh(index, true)
msg := kv.getNotified(index, ch)
if msg.success {
reply.Err = OK
}
}
}
Challenge 2
按照上面的实现,Challenge 2 可以直接 pass。
configuration 的依次更新保证了即便在配置更新过程中系统也能正常运行,而采用区分分片的处理权限状态就可以在配置完全更新前保证已完成分片的处理。
总结
历经千辛万苦,终于完成了所有 Lab。
虽然最终还是参考了别人的解决方案完成了 Lab 5B,但这段极度折磨的经历真的可以让人深刻体会到分布式系统的复杂性,前几天每天早上醒来想到还要继续调试 Lab5B 就不想起床了。。。
个人对所有 Lab 的难度排序是:Lab5B >>> Lab3B >> Lab3其他Part > Lab4 > 其他