挺早就听闻MIT6.824的大名,趁着这段时间比较闲,一边学习一边记录一下实验的完成过程。
MapReduce
MapReduce是一个用于大规模数据集并行处理的分布式计算框架。
步骤:
- 从分布式文件系统中加载文件并进行分片 ( Split ) .
- Master 节点通过心跳机制检测Worker节点状态,并分配任务给 Worker 节点。
- Worker 节点执行 Map 任务,将原始分配文件转换为中间键值对。随后,这些键值对会经过分区并在 shuffle 阶段进行分组和排序。
- Map 阶段结束后,Reduce Worker 接收分组后的键值对数据,执行 Reduce 操作并将最终结果文件输出到分布式文件系统中。
Lab1
Lab1的要求是在提供的code框架基础上,实现worker节点和Coordinator ( Master ) 节点的基本功能。
总体难度不高,思路也比较清晰。如果之前没怎么接触过go建议先看一下课程的LEC 5了解golang的并发设计方式。
Worker
对于 worker 节点,按照 hints 里的建议,首先修改Worker(),向Master节点发送RPC请求任务,得到任务后根据任务类型进行不同的操作。实验要求当没有任务时Worker应该退出, 这里我直接采用了 reply 中没有任务时Worker 自行结束的方案。
// main/mrworker.go calls this function.
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
// Your worker implementation here.
// uncomment to send the Example RPC to the coordinator.
// CallExample()
for {
args := Args{}
reply := Reply{}
call("Coordinator.RPCMaster", &args, &reply)
if reply.MapTask != nil {
doMapTask(mapf, reply.MapTask, reply.NReduce)
} else if reply.ReduceTask != nil {
doReduceTask(reducef, reply.ReduceTask, reply.FilesLen)
} else {
return
}
}
}
Map阶段
func doMapTask(mapf func(string, string) []KeyValue, task *MapTask, nReduce int) error {
file, err := os.Open(task.Filename)
if err != nil {
log.Fatalf("cannot open %v", task.Filename)
}
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("cannont read %v", task.Filename)
}
file.Close()
kva := mapf(task.Filename, string(content))
sort.Sort(ByKey(kva))
文件操作我们可以直接 copy mrsequential.go 中提供的部分。重点是分区操作,需要根据键的哈希值将键值对分配到对应的 Reduce 分区。
omap := make([][]KeyValue, nReduce)
for _, kv := range kva {
reduceNum := ihash(kv.Key) % nReduce
omap[reduceNum] = append(omap[reduceNum], kv)
}
for i := 0; i < nReduce; i++ {
intermediateFileName := fmt.Sprintf("map-%d-%d.json", task.Num, i)
tempFile, err := ioutil.TempFile(".", "tmp-")
tempFileName := tempFile.Name()
if err != nil {
return fmt.Errorf("failed to create temp file: %w", err)
}
enc := json.NewEncoder(tempFile)
for _, kv := range omap[i] {
err := enc.Encode(&kv)
if err != nil {
return fmt.Errorf("failed to create json file: %w", err)
}
}
tempFile.Close()
if err := os.Rename(tempFileName, intermediateFileName); err != nil {
return fmt.Errorf("failed to rename temp file to target file: %w", err)
}
}
omap 数组存储本次 Map 结果不同键应处的桶,创建对应文件并写入。最后,我们需要让 Master 知道这个节点的任务已经完成,所以我们还需要定义另一个 RPC 函数。
args := task
var reply Reply
call("Coordinator.MapTaskComplete", &args, &reply)
return nil
}
Reduce阶段
Reduce 同理,其实实现 Worker 部分的注意点在实验 hints 的讲述都比较详细了, 具体的实现也写在了 mrsequential.go 中。
func doReduceTask(reducef func(string, []string) string, task *ReduceTask, fileLens int) error {
var kva []KeyValue
for i := 0; i < fileLens; i++ {
intermediateFileName := fmt.Sprintf("map-%d-%d.json", i, task.Num)
file, err := os.Open(intermediateFileName)
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf("Error opening file: %v\n", err)
}
dec := json.NewDecoder(file)
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil {
break
}
kva = append(kva, kv)
}
}
sort.Sort(ByKey(kva))
收集任务 id 对应的 Map 任务的中间结果文件,然后 copy 一下 mrsequential.go 中具体的归约操作。
tempFile, err := ioutil.TempFile(".", "tmp-")
tempFileName := tempFile.Name()
if err != nil {
return fmt.Errorf("failed to create temp file: %w", err)
}
oname := fmt.Sprintf("mr-out-%d", task.Num)
i := 0
for i < len(kva) {
j := i + 1
for j < len(kva) && kva[j].Key == kva[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, kva[k].Value)
}
output := reducef(kva[i].Key, values)
fmt.Fprintf(tempFile, "%v %v\n", kva[i].Key, output)
i = j
}
tempFile.Close()
if err := os.Rename(tempFileName, oname); err != nil {
return fmt.Errorf("failed to rename temp file to target file: %w", err)
}
args := task
var reply Reply
call("Coordinator.ReduceTaskComplete", &args, &reply)
return nil
}
最后,和 Map 阶段一致, 我们也需要 RPC 通知 Master 节点任务完成。
Coordinator
我使用的 Master 结构体:
type Coordinator struct {
// Your definitions here.
mapCompletedTaskNums int
reduceCompletedTaskNums int
nReduce int
mapTaskCompleted bool
reduceTaskCompleted bool
mapTasks []*MapTask
reduceTasks []*ReduceTask
mu sync.Mutex
cond *sync.Cond
}
思路就是 Master 维护两个任务列表, 在 Worker 请求任务时将任务分配过去并监测任务的状态。 任务失败则改回任务状态等待下一个来请求任务的节点, 任务成功则更改任务状态为成功并检查所有的阶段任务 ( Map/Reduce ) 是否全部完成。
(代码写得不怎么优雅,见谅)
RPC应答函数:
func (c *Coordinator) RPCMaster(args *Args, reply *Reply) error {
c.mu.Lock()
defer c.mu.Unlock()
for { //map
if c.mapTaskCompleted {
break
} else if task := c.fetchMapTask(); task != nil {
reply.MapTask = task
reply.NReduce = c.nReduce
c.mapTaskStart(task)
return nil
} else {
c.cond.Wait()
}
}
for { //reduce
if c.reduceTaskCompleted {
break
} else if task := c.fetchReduceTask(); task != nil {
reply.ReduceTask = task
reply.FilesLen = len(c.mapTasks)
c.reduceTaskStart(task)
return nil
} else {
c.cond.Wait()
}
}
return nil
}
任务开始函数:
func (c *Coordinator) mapTaskStart(task *MapTask) {
task.State = STARTED
go func(task *MapTask) {
timedue := time.After(10 * time.Second)
<-timedue
c.mu.Lock()
defer c.mu.Unlock()
if task.State != FINISHED {
log.Printf("recover map task %d \n", task.Num)
task.State = WAITTING
c.cond.Broadcast()
}
}(task)
}
计时器检查任务状态,若失败则改回任务状态并通知RPC协程停止阻塞。
任务完成函数:
func (c *Coordinator) MapTaskComplete(task *MapTask, reply *Reply) error {
c.mu.Lock()
defer c.mu.Unlock()
c.mapTasks[task.Num].State = FINISHED
c.mapCompletedTaskNums++
if c.mapCompletedTaskNums == len(c.mapTasks) {
c.mapTaskCompleted = true
c.cond.Broadcast()
}
return nil
}
在Worker完成任务后RPC调用此函数更改任务状态。