1331 字
7 分钟
MIT 6.5840(原 6.824) 通关笔记 Lab1 MapReduce

挺早就听闻MIT6.824的大名,趁着这段时间比较闲,一边学习一边记录一下实验的完成过程。

MapReduce#

MapReduce

MapReduce是一个用于大规模数据集并行处理的分布式计算框架。

步骤:

  1. 从分布式文件系统中加载文件并进行分片 ( Split ) .
  2. Master 节点通过心跳机制检测Worker节点状态,并分配任务给 Worker 节点。
  3. Worker 节点执行 Map 任务,将原始分配文件转换为中间键值对。随后,这些键值对会经过分区并在 shuffle 阶段进行分组和排序。
  4. Map 阶段结束后,Reduce Worker 接收分组后的键值对数据,执行 Reduce 操作并将最终结果文件输出到分布式文件系统中。

Lab1#

Lab1的要求是在提供的code框架基础上,实现worker节点和Coordinator ( Master ) 节点的基本功能。

总体难度不高,思路也比较清晰。如果之前没怎么接触过go建议先看一下课程的LEC 5了解golang的并发设计方式。

Worker#

对于 worker 节点,按照 hints 里的建议,首先修改Worker(),向Master节点发送RPC请求任务,得到任务后根据任务类型进行不同的操作。实验要求当没有任务时Worker应该退出, 这里我直接采用了 reply 中没有任务时Worker 自行结束的方案。

// main/mrworker.go calls this function.
func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {
	// Your worker implementation here.
	// uncomment to send the Example RPC to the coordinator.
	// CallExample()
	for {
		args := Args{}
		reply := Reply{}
		call("Coordinator.RPCMaster", &args, &reply)
		if reply.MapTask != nil {
			doMapTask(mapf, reply.MapTask, reply.NReduce)
		} else if reply.ReduceTask != nil {
			doReduceTask(reducef, reply.ReduceTask, reply.FilesLen)
		} else {
			return
		}
	}
}

Map阶段#

func doMapTask(mapf func(string, string) []KeyValue, task *MapTask, nReduce int) error {
	file, err := os.Open(task.Filename)
	if err != nil {
		log.Fatalf("cannot open %v", task.Filename)
	}
	content, err := ioutil.ReadAll(file)
	if err != nil {
		log.Fatalf("cannont read %v", task.Filename)
	}
	file.Close()

	kva := mapf(task.Filename, string(content))
	sort.Sort(ByKey(kva))

文件操作我们可以直接 copy mrsequential.go 中提供的部分。重点是分区操作,需要根据键的哈希值将键值对分配到对应的 Reduce 分区。

omap := make([][]KeyValue, nReduce)
	for _, kv := range kva {
		reduceNum := ihash(kv.Key) % nReduce
		omap[reduceNum] = append(omap[reduceNum], kv)
	}
	for i := 0; i < nReduce; i++ {
		intermediateFileName := fmt.Sprintf("map-%d-%d.json", task.Num, i)
		tempFile, err := ioutil.TempFile(".", "tmp-")
		tempFileName := tempFile.Name()
		if err != nil {
			return fmt.Errorf("failed to create temp file: %w", err)
		}
		enc := json.NewEncoder(tempFile)
		for _, kv := range omap[i] {
			err := enc.Encode(&kv)
			if err != nil {
				return fmt.Errorf("failed to create json file: %w", err)
			}
		}
		tempFile.Close()
		if err := os.Rename(tempFileName, intermediateFileName); err != nil {
			return fmt.Errorf("failed to rename temp file to target file: %w", err)
		}
	}

omap 数组存储本次 Map 结果不同键应处的桶,创建对应文件并写入。最后,我们需要让 Master 知道这个节点的任务已经完成,所以我们还需要定义另一个 RPC 函数。

		args := task
		var reply Reply
		call("Coordinator.MapTaskComplete", &args, &reply)
	
		return nil
	}

Reduce阶段#

Reduce 同理,其实实现 Worker 部分的注意点在实验 hints 的讲述都比较详细了, 具体的实现也写在了 mrsequential.go 中。

func doReduceTask(reducef func(string, []string) string, task *ReduceTask, fileLens int) error {
	var kva []KeyValue
	for i := 0; i < fileLens; i++ {
		intermediateFileName := fmt.Sprintf("map-%d-%d.json", i, task.Num)
		file, err := os.Open(intermediateFileName)
		if err != nil && !os.IsNotExist(err) {
			return fmt.Errorf("Error opening file: %v\n", err)
		}
		dec := json.NewDecoder(file)
		for {
			var kv KeyValue
			if err := dec.Decode(&kv); err != nil {
				break
			}
			kva = append(kva, kv)
		}
	}
	sort.Sort(ByKey(kva))

收集任务 id 对应的 Map 任务的中间结果文件,然后 copy 一下 mrsequential.go 中具体的归约操作。

tempFile, err := ioutil.TempFile(".", "tmp-")
	tempFileName := tempFile.Name()
	if err != nil {
		return fmt.Errorf("failed to create temp file: %w", err)
	}
	oname := fmt.Sprintf("mr-out-%d", task.Num)
	i := 0
	for i < len(kva) {
		j := i + 1
		for j < len(kva) && kva[j].Key == kva[i].Key {
			j++
		}
		values := []string{}
		for k := i; k < j; k++ {
			values = append(values, kva[k].Value)
		}
		output := reducef(kva[i].Key, values)

		fmt.Fprintf(tempFile, "%v %v\n", kva[i].Key, output)
		i = j
	}
	tempFile.Close()
	if err := os.Rename(tempFileName, oname); err != nil {
		return fmt.Errorf("failed to rename temp file to target file: %w", err)
	}
	args := task
	var reply Reply
	call("Coordinator.ReduceTaskComplete", &args, &reply)
	return nil
}

最后,和 Map 阶段一致, 我们也需要 RPC 通知 Master 节点任务完成。

Coordinator#

我使用的 Master 结构体:

type Coordinator struct {
	// Your definitions here.
	mapCompletedTaskNums    int
	reduceCompletedTaskNums int
	nReduce                 int
	mapTaskCompleted        bool
	reduceTaskCompleted     bool
	mapTasks                []*MapTask
	reduceTasks             []*ReduceTask
	mu                      sync.Mutex
	cond                    *sync.Cond
}

思路就是 Master 维护两个任务列表, 在 Worker 请求任务时将任务分配过去并监测任务的状态。 任务失败则改回任务状态等待下一个来请求任务的节点, 任务成功则更改任务状态为成功并检查所有的阶段任务 ( Map/Reduce ) 是否全部完成。

(代码写得不怎么优雅,见谅)

RPC应答函数:

func (c *Coordinator) RPCMaster(args *Args, reply *Reply) error {
	c.mu.Lock()
	defer c.mu.Unlock()
	for { //map
		if c.mapTaskCompleted {
			break
		} else if task := c.fetchMapTask(); task != nil {
			reply.MapTask = task
			reply.NReduce = c.nReduce
			c.mapTaskStart(task)
			return nil
		} else {
			c.cond.Wait()
		}
	}
	for { //reduce
		if c.reduceTaskCompleted {
			break
		} else if task := c.fetchReduceTask(); task != nil {
			reply.ReduceTask = task
			reply.FilesLen = len(c.mapTasks)
			c.reduceTaskStart(task)
			return nil
		} else {
			c.cond.Wait()
		}
	}
	return nil
}

任务开始函数:

func (c *Coordinator) mapTaskStart(task *MapTask) {
	task.State = STARTED
	go func(task *MapTask) {
		timedue := time.After(10 * time.Second)
		<-timedue
		c.mu.Lock()
		defer c.mu.Unlock()
		if task.State != FINISHED {
			log.Printf("recover map task %d \n", task.Num)
			task.State = WAITTING
			c.cond.Broadcast()
		}
	}(task)
}

计时器检查任务状态,若失败则改回任务状态并通知RPC协程停止阻塞。

任务完成函数:

func (c *Coordinator) MapTaskComplete(task *MapTask, reply *Reply) error {
	c.mu.Lock()
	defer c.mu.Unlock()
	c.mapTasks[task.Num].State = FINISHED
	c.mapCompletedTaskNums++
	if c.mapCompletedTaskNums == len(c.mapTasks) {
		c.mapTaskCompleted = true
		c.cond.Broadcast()
	}
	return nil
}

在Worker完成任务后RPC调用此函数更改任务状态。

MIT 6.5840(原 6.824) 通关笔记 Lab1 MapReduce
https://yomi.moe/posts/mit65840-1/
作者
藍々
发布于
2024-11-15
许可协议
CC BY-NC-SA 4.0