挺早就听闻MIT6.824的大名,趁着这段时间比较闲,一边学习一边记录一下实验的完成过程。
MapReduce
MapReduce是一个用于大规模数据集并行处理的分布式计算框架。
步骤:
- 从分布式文件系统中加载文件并进行分片 ( Split ) .
- Master 节点通过心跳机制检测Worker节点状态,并分配任务给 Worker 节点。
- Worker 节点执行 Map 任务,将原始分配文件转换为中间键值对。随后,这些键值对会经过分区并在 shuffle 阶段进行分组和排序。
- Map 阶段结束后,Reduce Worker 接收分组后的键值对数据,执行 Reduce 操作并将最终结果文件输出到分布式文件系统中。
Lab1
Lab1的要求是在提供的code框架基础上,实现worker节点和Coordinator ( Master ) 节点的基本功能。
总体难度不高,思路也比较清晰。如果之前没怎么接触过go建议先看一下课程的LEC 5了解golang的并发设计方式。
Worker
对于 worker 节点,按照 hints 里的建议,首先修改Worker(),向Master节点发送RPC请求任务,得到任务后根据任务类型进行不同的操作。实验要求当没有任务时Worker应该退出, 这里我直接采用了 reply 中没有任务时Worker 自行结束的方案。
// main/mrworker.go calls this function.func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) { // Your worker implementation here. // uncomment to send the Example RPC to the coordinator. // CallExample() for { args := Args{} reply := Reply{} call("Coordinator.RPCMaster", &args, &reply) if reply.MapTask != nil { doMapTask(mapf, reply.MapTask, reply.NReduce) } else if reply.ReduceTask != nil { doReduceTask(reducef, reply.ReduceTask, reply.FilesLen) } else { return } }}
Map阶段
func doMapTask(mapf func(string, string) []KeyValue, task *MapTask, nReduce int) error { file, err := os.Open(task.Filename) if err != nil { log.Fatalf("cannot open %v", task.Filename) } content, err := ioutil.ReadAll(file) if err != nil { log.Fatalf("cannont read %v", task.Filename) } file.Close()
kva := mapf(task.Filename, string(content)) sort.Sort(ByKey(kva))
文件操作我们可以直接 copy mrsequential.go 中提供的部分。重点是分区操作,需要根据键的哈希值将键值对分配到对应的 Reduce 分区。
omap := make([][]KeyValue, nReduce) for _, kv := range kva { reduceNum := ihash(kv.Key) % nReduce omap[reduceNum] = append(omap[reduceNum], kv) } for i := 0; i < nReduce; i++ { intermediateFileName := fmt.Sprintf("map-%d-%d.json", task.Num, i) tempFile, err := ioutil.TempFile(".", "tmp-") tempFileName := tempFile.Name() if err != nil { return fmt.Errorf("failed to create temp file: %w", err) } enc := json.NewEncoder(tempFile) for _, kv := range omap[i] { err := enc.Encode(&kv) if err != nil { return fmt.Errorf("failed to create json file: %w", err) } } tempFile.Close() if err := os.Rename(tempFileName, intermediateFileName); err != nil { return fmt.Errorf("failed to rename temp file to target file: %w", err) } }
omap 数组存储本次 Map 结果不同键应处的桶,创建对应文件并写入。最后,我们需要让 Master 知道这个节点的任务已经完成,所以我们还需要定义另一个 RPC 函数。
args := task var reply Reply call("Coordinator.MapTaskComplete", &args, &reply)
return nil }
Reduce阶段
Reduce 同理,其实实现 Worker 部分的注意点在实验 hints 的讲述都比较详细了, 具体的实现也写在了 mrsequential.go 中。
func doReduceTask(reducef func(string, []string) string, task *ReduceTask, fileLens int) error { var kva []KeyValue for i := 0; i < fileLens; i++ { intermediateFileName := fmt.Sprintf("map-%d-%d.json", i, task.Num) file, err := os.Open(intermediateFileName) if err != nil && !os.IsNotExist(err) { return fmt.Errorf("Error opening file: %v\n", err) } dec := json.NewDecoder(file) for { var kv KeyValue if err := dec.Decode(&kv); err != nil { break } kva = append(kva, kv) } } sort.Sort(ByKey(kva))
收集任务 id 对应的 Map 任务的中间结果文件,然后 copy 一下 mrsequential.go 中具体的归约操作。
tempFile, err := ioutil.TempFile(".", "tmp-") tempFileName := tempFile.Name() if err != nil { return fmt.Errorf("failed to create temp file: %w", err) } oname := fmt.Sprintf("mr-out-%d", task.Num) i := 0 for i < len(kva) { j := i + 1 for j < len(kva) && kva[j].Key == kva[i].Key { j++ } values := []string{} for k := i; k < j; k++ { values = append(values, kva[k].Value) } output := reducef(kva[i].Key, values)
fmt.Fprintf(tempFile, "%v %v\n", kva[i].Key, output) i = j } tempFile.Close() if err := os.Rename(tempFileName, oname); err != nil { return fmt.Errorf("failed to rename temp file to target file: %w", err) } args := task var reply Reply call("Coordinator.ReduceTaskComplete", &args, &reply) return nil}
最后,和 Map 阶段一致, 我们也需要 RPC 通知 Master 节点任务完成。
Coordinator
我使用的 Master 结构体:
type Coordinator struct { // Your definitions here. mapCompletedTaskNums int reduceCompletedTaskNums int nReduce int mapTaskCompleted bool reduceTaskCompleted bool mapTasks []*MapTask reduceTasks []*ReduceTask mu sync.Mutex cond *sync.Cond}
思路就是 Master 维护两个任务列表, 在 Worker 请求任务时将任务分配过去并监测任务的状态。 任务失败则改回任务状态等待下一个来请求任务的节点, 任务成功则更改任务状态为成功并检查所有的阶段任务 ( Map/Reduce ) 是否全部完成。
(代码写得不怎么优雅,见谅)
RPC应答函数:
func (c *Coordinator) RPCMaster(args *Args, reply *Reply) error { c.mu.Lock() defer c.mu.Unlock() for { //map if c.mapTaskCompleted { break } else if task := c.fetchMapTask(); task != nil { reply.MapTask = task reply.NReduce = c.nReduce c.mapTaskStart(task) return nil } else { c.cond.Wait() } } for { //reduce if c.reduceTaskCompleted { break } else if task := c.fetchReduceTask(); task != nil { reply.ReduceTask = task reply.FilesLen = len(c.mapTasks) c.reduceTaskStart(task) return nil } else { c.cond.Wait() } } return nil}
任务开始函数:
func (c *Coordinator) mapTaskStart(task *MapTask) { task.State = STARTED go func(task *MapTask) { timedue := time.After(10 * time.Second) <-timedue c.mu.Lock() defer c.mu.Unlock() if task.State != FINISHED { log.Printf("recover map task %d \n", task.Num) task.State = WAITTING c.cond.Broadcast() } }(task)}
计时器检查任务状态,若失败则改回任务状态并通知RPC协程停止阻塞。
任务完成函数:
func (c *Coordinator) MapTaskComplete(task *MapTask, reply *Reply) error { c.mu.Lock() defer c.mu.Unlock() c.mapTasks[task.Num].State = FINISHED c.mapCompletedTaskNums++ if c.mapCompletedTaskNums == len(c.mapTasks) { c.mapTaskCompleted = true c.cond.Broadcast() } return nil}
在Worker完成任务后RPC调用此函数更改任务状态。