MIT-6.824 Distributed Systems-Lab 1 MapReduce

MIT-6.824(Spring 2022)Lab 1 MapReduce

6.824 Lab 1: MapReduce

简介

构建一个MapReduce系统

  1. 实现一个worker进程,调用Map和Reduce函数、处理读写文件,
  2. 实现coordinator进程,向worker分发任务并提供容错机制。

准备开始

src/main/mrsequential.go 中提供了串行的mapreduce程序,在单进程里面直接顺序执行Map操作和Reduce操作

同时提供了一些MapReduce的应用程序:

mrapps/wc.go:WordCount程序

mrapps/indexer.go:text-indexer

按照如下的方式运行串行的mapreduce程序:

cd src/main
go build -race -buildmode=plugin ../mrapps/wc.go
rm mr-out*
go run -race mrsequential.go wc.so pg*.txt
more mr-out-0

输出的文件中是对文件的WordCount结果

代码理解

插件模式编译

参考资料

Go是静态编译型语言,在编译时就将所有引用的包(库)全部加载打包到最终的可执行程序(或库文件)中,因此并不能在运行时动态加载其他共享库。Go Plugin提供了这样一种方式,能够让你在运行时动态加载外部功能。

  • 可插拔:有了Plugin,我的程序可以根据需要随时替换其中某些部件而不用修改我的程序;
  • 动态加载的需要:有些模块只有在运行时才能确定,需要动态加载外部的功能模块;
  • 独立开发:Plugin 可以和主程序独立建设,主程序只需要制定好框架,实现默认(模版)功能。Plugin 可根据用户需求随时自行扩展开发,运行时随意替换,提高了程序的可定制性;

type Plugin即Golang加载的插件,与之有关的两个方法:

  • Open: 根据参数path提供的插件路径加载这个插件,并返回插件这个插件结构的指针*Plugin
  • Lookup: *Plugin的惟一方法,通过名称symName在插件中寻找对应的变量或方法,以Symbol的形式返回

因此这一行命令将 wc.go文件编译成了一个插件 wc.so(默认文件名),从而可以插入到MapReduce主程序中运行。

wc.go-Map函数

// The map function is called once for each file of input. The first
// argument is the name of the input file, and the second is the
// file's complete contents. You should ignore the input file name,
// and look only at the contents argument. The return value is a slice
// of key/value pairs.
func Map(filename string, contents string) []mr.KeyValue {
	// function to detect word separators.
	ff := func(r rune) bool { return !unicode.IsLetter(r) }

	// split contents into an array of words.
	words := strings.FieldsFunc(contents, ff)

	kva := []mr.KeyValue{}
	for _, w := range words {
		kv := mr.KeyValue{w, "1"}
		kva = append(kva, kv)
	}
	return kva
}

对每一个传进来的字符串,通过 strings.FieldsFunc函数找到字符串的分割点,分割成单独的单词,构造成KeyValue结构体并合并成切片返回

wc.go-Reduce函数

// The reduce function is called once for each key generated by the
// map tasks, with a list of all the values created for that key by
// any map task.
func Reduce(key string, values []string) string {
	// return the number of occurrences of this word.
	return strconv.Itoa(len(values))
}

直接以字符串的形式返回values的长度

串行MapReduce运行

导入插件

// load the application Map and Reduce functions
// from a plugin file, e.g. ../mrapps/wc.so
func loadPlugin(filename string) (func(string, string) []mr.KeyValue, func(string, []string) string) {
	p, err := plugin.Open(filename)
	if err != nil {
		log.Fatalf("cannot load plugin %v", filename)
	}
	xmapf, err := p.Lookup("Map")
	if err != nil {
		log.Fatalf("cannot find Map in %v", filename)
	}
	mapf := xmapf.(func(string, string) []mr.KeyValue)
	xreducef, err := p.Lookup("Reduce")
	if err != nil {
		log.Fatalf("cannot find Reduce in %v", filename)
	}
	reducef := xreducef.(func(string, []string) string)

	return mapf, reducef
}

从编译好的*.so文件中查找Map函数和Reduce函数,通过函数的返回值类型进行类型推断,最终返回两个函数通过主函数里面的变量进行接收

mapf, reducef := loadPlugin(os.Args[1])

打开文件,进行Map操作

       //
// read each input file,
// pass it to Map,
// accumulate the intermediate Map output.
//
intermediate := []mr.KeyValue{}
for _, filename := range os.Args[2:] {
	file, err := os.Open(filename)
	if err != nil {
		log.Fatalf("cannot open %v", filename)
	}
	content, err := ioutil.ReadAll(file)
	if err != nil {
		log.Fatalf("cannot read %v", filename)
	}
	file.Close()
	kva := mapf(filename, string(content))
	intermediate = append(intermediate, kva...)
}

pg-*.txt会匹配到所有满足条件的文件,将文件逐个打开,读取文件内容,通过Map函数处理成中间数据格式,存入中间变量intermediate

排序

对中间变量的切片按照键的字典序进行排序

sort.Sort(ByKey(intermediate))
// for sorting by key.
func (a ByKey) Len() int           { return len(a) }
func (a ByKey) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }

这里是通过实现Sort的接口实现了自定义排序

统计Reduce

//
// call Reduce on each distinct key in intermediate[],
// and print the result to mr-out-0.
//
i := 0
for i < len(intermediate) {
	j := i + 1
	for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
		j++
	}
	values := []string{}
	for k := i; k < j; k++ {
		values = append(values, intermediate[k].Value)
	}
	output := reducef(intermediate[i].Key, values)

	// this is the correct format for each line of Reduce output.
	fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)

	i = j
}

由于已经排好顺序了,从左到右遍历一遍就可以统计每一个键出现的数量,然后输出到文件即可。

我的工作

实现一个分布式MapReduce,由coordinator和worker两个程序组成。

coordinator进程只有一个,worker进程有一个或多个并行执行。

worker进程将通过RPC与coordinator进程进行通信。每个worker进程将向coordinator进程请求任务,从一个或多个文件中读取任务的输入,执行任务,并将任务的输出写入一个或更多个文件。

coordinator进程应该注意到一个worker进程是否没有在合理的时间内完成其任务(10秒),并将相同的任务交给另一个worker进程。

coordinator和worker的“main”函数在 main/mrcordinator.gomain/mrworker.go

实现应该在 mr/coordinator.gomr/worker.gomr/rpc.go中。

测试运行:

go build -race -buildmode=plugin ../mrapps/wc.go
rm mr-out*
go run -race mrcoordinator.go pg-*.txt

go run -race mrworker.go wc.so
go run -race mrworker.go wc.so

测试脚本:

bash test-mr.sh

代码理解

待补充的代码提供了一个RPC的示例

启动Worker后,会调用CallExample()函数

// example function to show how to make an RPC call to the coordinator.
//
// the RPC argument and reply types are defined in rpc.go.
func CallExample() {

	// declare an argument structure.
	args := ExampleArgs{}

	// fill in the argument(s).
	args.X = 99

	// declare a reply structure.
	reply := ExampleReply{}

	// send the RPC request, wait for the reply.
	// the "Coordinator.Example" tells the
	// receiving server that we'd like to call
	// the Example() method of struct Coordinator.
	ok := call("Coordinator.Example", &args, &reply)
	if ok {
		// reply.Y should be 100.
		fmt.Printf("reply.Y %v\n", reply.Y)
	} else {
		fmt.Printf("call failed!\n")
	}
}

函数构建了RPC的结构体,然后调用call函数并接收响应

在这里体现了RPC的核心思想:在这里看起来就是调用的本地函数call,但是实际上call内部是与coordinator进行通信,然后在远程得到返回值后返回给reply结构体,因此为“远程过程调用”

call函数:

// send an RPC request to the coordinator, wait for the response.
// usually returns true.
// returns false if something goes wrong.
func call(rpcname string, args interface{}, reply interface{}) bool {
	// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
	sockname := coordinatorSock()
	c, err := rpc.DialHTTP("unix", sockname)
	if err != nil {
		log.Fatal("dialing:", err)
	}
	defer c.Close()

	err = c.Call(rpcname, args, reply)
	if err == nil {
		return true
	}

	fmt.Println(err)
	return false
}

注意coordinatorSock()方法,会获取一个临时文件,通信是通过这个临时文件进行的。

在coordinator.go内部,RPC指定的方法"Coordinator.Example":

// an example RPC handler.
//
// the RPC argument and reply types are defined in rpc.go.
func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error {
	reply.Y = args.X + 1
	return nil
}

因此返回的结构体中reply.Y的值就为100

在启动Worker前要先启动Coordinator,启动后首先创建一个Coordinator结构:

// create a Coordinator.
// main/mrcoordinator.go calls this function.
// nReduce is the number of reduce tasks to use.
func MakeCoordinator(files []string, nReduce int) *Coordinator {
	c := Coordinator{}

	// Your code here.

	c.server()
	return &c
}

其中调用server方法,监听Worker的RPC:

// start a thread that listens for RPCs from worker.go
func (c *Coordinator) server() {
	rpc.Register(c)
	rpc.HandleHTTP()
	//l, e := net.Listen("tcp", ":1234")
	sockname := coordinatorSock()
	os.Remove(sockname)
	l, e := net.Listen("unix", sockname)
	if e != nil {
		log.Fatal("listen error:", e)
	}
	go http.Serve(l, nil)
}

Coordinator会不断检测Done方法的返回值,一旦为true,Coordinator就会退出:

// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
func (c *Coordinator) Done() bool {
	ret := false

	// Your code here.

	return ret
}

Map简单实现

首先考虑简单一些,不考虑并行、容错处理等,先把整个的流程跑通。

首先跑通Map流程

Coordinator的数据结构:

type Coordinator struct {
	// Your definitions here.
	MapTask    []MapTaskInformation    // Map任务列表
	ReduceTask []ReduceTaskInformation // Reduce任务列表
}

内部有两个切片,分别对应Map的任务列表和Reduce的任务列表。

两个任务列表是在Coordinator启动的时候就设置好:

// create a Coordinator.
// main/mrcoordinator.go calls this function.
// nReduce is the number of reduce tasks to use.
func MakeCoordinator(files []string, nReduce int) *Coordinator {

	mapTaskSlice := []MapTaskInformation{}

	for id, fileName := range files {
		mapTaskSlice = append(mapTaskSlice, MapTaskInformation{
			Id:                   id + 1,
			State:                0,
			NReduce:              nReduce,
			OriginFileName:       fileName,
			IntermediateFileName: "mr-" + strconv.Itoa(id+1) + "-",
		})
	}

	reduceTaskSlice := []ReduceTaskInformation{}

	for i := 0; i < nReduce; i++ {
		reduceTaskSlice = append(reduceTaskSlice, ReduceTaskInformation{
			Id:             i + 1,
			State:          0,
			OriginFileName: "mr-0-" + strconv.Itoa(i+1),
			OutputFileName: "mr-" + strconv.Itoa(i+1),
		})
	}

	c := Coordinator{
		MapTask:    mapTaskSlice,
		ReduceTask: reduceTaskSlice,
	}

	// Your code here.

	c.server()
	return &c
}

其中为Map和Reduce暂时设计的数据结构:

type MapTaskInformation struct {
	Id                   int    // 任务唯一编码
	State                int    // 0表示未开始,1表示正在进行,2表示已经完成
	NReduce              int    // 分成Reduce任务的数量
	OriginFileName       string // 原始文件名称
	IntermediateFileName string // Map任务完成后的文件名称(中间文件)
}
type ReduceTaskInformation struct {
	Id             int    // 任务唯一编码
	State          int    // 0表示未开始,1表示正在进行,2表示已经完成
	OriginFileName string // Reduce的初始文件名称(中间文件)
	OutputFileName string // Reduce任务完成后的最终文件名称
}

Worker启动时,通过RPC向Coordinator要一个任务

// main/mrworker.go calls this function.
func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {

	args := TaskInformation{}
	reply := TaskInformation{}

	ok := call("Coordinator.AsssignTask", &args, &reply)

Coordinator会遍历自己内部的所有任务列表,找到第一个还没有完成的任务分配给这个Worker:

// 分配任务
func (c *Coordinator) AsssignTask(args *TaskInformation, reply *TaskInformation) error {
	isMapfinished := true
	//遍历所有的Map任务信息,将未开始的分配给这个节点
	for i, mapTask := range c.MapTask {
		if mapTask.State == 0 {
			isMapfinished = false
			reply.Id = mapTask.Id
			reply.TaskType = "map"
			reply.InputFileName = mapTask.OriginFileName
			reply.OutputFileName = mapTask.IntermediateFileName
			reply.NReduce = mapTask.NReduce
			c.MapTask[i].State = 1
			return nil
		} else if mapTask.State == 1 {
			isMapfinished = false
		}
	}
	// 如果所有的Map任务都完成了,就遍历Reduce任务
	if isMapfinished {
		for _, reduceTask := range c.ReduceTask {
			if reduceTask.State == 0 {
				return nil
			}
		}
	}
	return nil
}

Worker接收到任务后使用插件中的Map函数进行处理,并将成功完成任务的消息通过RPC的方式返回给Coordinator

if ok {
	fmt.Println("Call Success!")
	if reply.TaskType == "map" {
		fmt.Printf("Map Task!\n")
		intermediate := []KeyValue{}
		file, err := os.Open(reply.InputFileName)
		if err != nil {
			log.Fatalf("cannot open %v", reply.InputFileName)
		}
		content, err := io.ReadAll(file)
		if err != nil {
			log.Fatalf("cannot read %v", reply.InputFileName)
		}
		file.Close()
		kva := mapf(reply.InputFileName, string(content))
		intermediate = append(intermediate, kva...)

		// 排序
		sort.Sort(ByKey(intermediate))

		fmt.Println(intermediate)
		args = reply
		call("Coordinator.TaskFinish", &args, &reply)

Coordinator接收消息,将自己内部的任务状态修改,后续就不会再将这个任务分配给Worker了。

// 接收任务已经完成的信息
func (c *Coordinator) TaskFinish(args *TaskInformation, reply *TaskInformation) error {
	if args.TaskType == "map" {
		c.MapTask[args.Id-1].State = 2
	} else if args.TaskType == "reduce" {
		c.ReduceTask[args.Id-1].State = 2
	}
	return nil
}

问题:

  1. Worker要任务的时候Coordinator去列表中遍历是不是有点太傻了,有更好的办法吗?比如Coordinator维护未完成的和已完成的任务列表,然后动态更新?
  2. 定义的struct数据结构不一定合理,还要看后面怎么用
  3. RPC传递的数据结构不是很合理,而且有大量的冗余,比如后面的消息args和reply几乎完全相同,后面需要修改

Reduce简单实现

首先在Worker的主函数增加一层循环,从而使Worker不断请求任务,由Coordinator按需分配

首先要构造中间文件,也就是map结束后的文件需要存起来,然后才能用reduce去处理

// 循环创建NReduce个文件准备保存
encoderList := make([]*json.Encoder, 0)
for i := 0; i < reply.NReduce; i++ {
	fileName := reply.OutputFileName + strconv.FormatInt(int64(i+1), 10)
	tempFile, err := os.Create(fileName)
	if err != nil {
		log.Fatalf("cannot create %v", fileName)
	}
	defer tempFile.Close()
	encoderList = append(encoderList, json.NewEncoder(tempFile))
}
for i, v := range intermediate {
	encoderList[ihash(v.Key)%reply.NReduce].Encode(&intermediate[i])
}

map在保存的时候要直接分成NReduce的文件,文件的内容是由哈希函数对键进行映射后得到的,保证键大致平均分到NReduce个节点上

保存文件的时候使用的是json的格式,保存的过程有些慢,需要对整个map的结果全部遍历一遍,后续可以考虑并行处理?

Reduce内容:

} else if reply.TaskType == "reduce" {

	ofile, _ := os.Create(reply.OutputFileName)

	fmt.Printf("Reduce Task!\n")
	kva := make([]KeyValue, 0)
	for p := 1; p <= 8; p++ {
		filename := strings.Replace(reply.InputFileName, "*", strconv.FormatInt(int64(p), 10), 1)
		fmt.Println(filename)
		file, err := os.Open(filename)
		if err != nil {
			log.Fatalf("cannot open %v", filename)
		}
		dec := json.NewDecoder(file)
		for {
			var kv KeyValue
			if err := dec.Decode(&kv); err != nil {
				break
			}
			kva = append(kva, kv)
		}
	}
	// 排序
	sort.Sort(ByKey(kva))
	//
	// call Reduce on each distinct key in intermediate[],
	// and print the result to mr-out-0.
	//
	i := 0
	for i < len(kva) {
		j := i + 1
		for j < len(kva) && kva[j].Key == kva[i].Key {
			j++
		}
		values := []string{}
		for k := i; k < j; k++ {
			values = append(values, kva[k].Value)
		}
		output := reducef(kva[i].Key, values)

		// this is the correct format for each line of Reduce output.
		fmt.Fprintf(ofile, "%v %v\n", kva[i].Key, output)

		i = j
	}

循环读取map保存下来的内容,这里写死了,后面需要调整。

读取内容后汇总并排序,排序后直接使用串行的Reduce代码即可

对于Coordinator,将Reduce的内容添加进去即可:

// 如果所有的Map任务都完成了,就遍历Reduce任务
if isMapfinished {
	for i, reduceTask := range c.ReduceTask {
		if reduceTask.State == 0 {
			reply.Id = reduceTask.Id
			reply.TaskType = "reduce"
			reply.InputFileName = reduceTask.OriginFileName
			reply.OutputFileName = reduceTask.OutputFileName
			mu.Lock()
			c.ReduceTask[i].State = 1
			mu.Unlock()
			return nil
		}
	}

}

Reduce结束后需要告知主Coordinator在无限循环的Done(),返回True让其退出:

// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
func (c *Coordinator) Done() bool {
	ret := true
	mu.Lock()
	// Your code here.
	for _, v := range c.ReduceTask {
		if v.State != 2 {
			ret = false
			break
		}
	}
	mu.Unlock()
	return ret
}

中间添加了锁,但是添加的有些问题,后面需要调整。

到这里的代码除了异常处理外已经都能测试通过了,只不过是有data race问题

问题:

  1. Worker的无限循环退不出去,需要Coordinator通过RPC的方式告知才可以
  2. Reduce的遍历文件写死了,需要动态变化去判断
  3. Coordinator存在data race问题,是循环遍历任务和对任务的完成情况进行更改后两者的锁加的不太好导致的,需要对数据结构进行修改
  4. 没有异常处理,不能处理有Worker异常退出的情况,实际测试中陷入了死循环,需要进行调整

问题及解决

首先将Coordinator对于任务的数据结构更改,内部维护三个双向链表,分别表示未开始的任务,正在进行的任务和已经结束的任务,链表外面使用map的数据结构,从而支持快速查找。在生成任务的时候自动赋值一个全局唯一的id。

数据结构中要包括全部的信息,主要变化部分是对输入和输出的信息,将Map的输入、输出和Reduce的输出都在初始化的时候直接写在结构体中,避免后续进行多次判断和修改。

结构体:

// Coordinator存储的主要信息,包括Map和Reduce两部分任务的信息以及工作节点的信息
type Coordinator struct {
	UniqueIdSlice     []*list.Element // 通过任务Id找到任务信息的切片,相当于一个Map
	MapTaskNum        int             // map任务总数量
	ReduceTaskNum     int             // reduce任务总数量
	WorkerNum         int             // 目前正在工作的节点数量
	MapTask                           // Map任务信息链表
	ReduceTask                        // Reduce任务信息链表
	WorkerInformation                 // Worker的信息
}

// Map任务信息链表,包括三个链表,分别表示未开始、正在进行和已经完成的任务
type MapTask struct {
	MapListReady    *list.List // 未开始的Map任务
	MapListRunning  *list.List // 正在进行的Map任务
	MapListComplete *list.List // 已经完成的Map任务
}

// Reduce任务信息链表,包括三个链表,分别表示未开始、正在进行和已经完成的任务
type ReduceTask struct {
	ReduceListReady    *list.List // 未开始的Reduce任务
	ReduceListRunning  *list.List // 正在进行的Reduce任务
	ReduceListComplete *list.List // 已经完成的Reduce任务
}

// Map任务具体信息
type MapTaskInformation struct {
	Id                   int      // 任务唯一编码
	OriginFileName       string   // 原始文件名称
	IntermediateFileName []string // Map任务完成后中间文件列表
}

// Reduce任务具体信息
type ReduceTaskInformation struct {
	Id                   int      // 任务唯一编码
	IntermediateFileName []string // Reduce的初始中间文件列表(从Map处获得)
	OutputFileName       string   // Reduce任务完成后的最终文件名称
}

Worker中分为几个步骤:

  1. 告知Coordinator自己已经上线
  2. 向Coordinator请求任务
  3. 向Coordinator返回自己的Map任务已经完成
  4. 向Coordinator返回自己的Reduce任务已经完成
  5. 向Coordinator返回自己退出的消息

主程序如下:

// main/mrworker.go 调用的函数
func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) {

	// 1. 告知Coordinator自己已经上线
	args := WorkerArgs{TaskType: "None"}
	reply := WorkerReply{TaskType: "None"}
	call("Coordinator.WorkerOnline", &args, &reply)

	// 无限循环向Coordinator请求任务
	for {
		// 2. 向Coordinator请求任务
		args = WorkerArgs{TaskType: "None"}
		reply = WorkerReply{TaskType: "None"}
		ok := call("Coordinator.AsssignTask", &args, &reply)

		if ok {

			fmt.Println("Call Success!")

			if reply.TaskType == "map" {

				fmt.Printf("Map Task!\n")

				// 读取文件,调用map函数进行处理
				intermediate := []KeyValue{}
				file, err := os.Open(reply.MapInput)
				if err != nil {
					log.Fatalf("cannot open %v", reply.MapInput)
				}
				content, err := io.ReadAll(file)
				if err != nil {
					log.Fatalf("cannot read %v", reply.MapInput)
				}
				file.Close()
				kva := mapf(reply.MapInput, string(content))
				intermediate = append(intermediate, kva...)

				// 循环创建NReduce个文件准备保存
				encoderList := make([]*json.Encoder, 0)
				for _, fileName := range reply.MapOutput {
					tempFile, err := os.Create(fileName)
					if err != nil {
						log.Fatalf("cannot create %v", fileName)
					}
					defer tempFile.Close()
					encoderList = append(encoderList, json.NewEncoder(tempFile))
				}
				// 将map后的结果存入文件中(最费时间)
				for i, v := range intermediate {
					encoderList[ihash(v.Key)%len(reply.MapOutput)].Encode(&intermediate[i])
				}

				// 3. 向Coordinator返回自己的Map任务已经完成
				args.TaskType = "map"
				args.Taskid = reply.Id
				call("Coordinator.TaskFinish", &args, &reply)

			} else if reply.TaskType == "reduce" {

				fmt.Printf("Reduce Task!\n")

				// 创建输出文件
				ofile, _ := os.Create(reply.ReduceOutput)

				// 遍历输入文件,汇总Map产生的所有结果
				kva := make([]KeyValue, 0)
				for _, filename := range reply.ReduceInput {
					// fmt.Println(filename)
					file, err := os.Open(filename)
					if err != nil {
						log.Fatalf("cannot open %v", filename)
					}
					dec := json.NewDecoder(file)
					for {
						var kv KeyValue
						if err := dec.Decode(&kv); err != nil {
							break
						}
						kva = append(kva, kv)
					}
				}

				// 排序
				sort.Sort(ByKey(kva))

				// 在已经排好序的键值对上进行统计,并写入到文件中
				i := 0
				for i < len(kva) {
					j := i + 1
					for j < len(kva) && kva[j].Key == kva[i].Key {
						j++
					}
					values := []string{}
					for k := i; k < j; k++ {
						values = append(values, kva[k].Value)
					}
					output := reducef(kva[i].Key, values)

					fmt.Fprintf(ofile, "%v %v\n", kva[i].Key, output)

					i = j
				}

				// 4. 向Coordinator返回自己的Reduce任务已经完成
				args.Taskid = reply.Id
				args.TaskType = "reduce"
				call("Coordinator.TaskFinish", &args, &reply)

			} else if reply.TaskType == "finish" {

				// 5. 向Coordinator返回自己退出的消息
				call("Coordinator.WorkerFinish", &args, &reply)
				fmt.Printf("Bye!\n")
				return
			}
		} else {
			fmt.Printf("Call failed!\n")
		}

		// 间隔1秒请求一次
		time.Sleep(time.Second)
	}
}

其中将RPC的发送和接收的结构体更改的更为合理:

// Worker向Coordinator传递的信息
type WorkerArgs struct {
	Id       int    // Worker的唯一ID
	Taskid   int    // 任务全局唯一ID
	TaskType string // 任务类型
}

// Coordinator向Worker传递的信息
type WorkerReply struct {
	Id           int      // 任务id
	TaskType     string   // 任务类型
	MapInput     string   // Map任务的输入
	MapOutput    []string // Map任务的输出
	ReduceInput  []string // Reduce任务的输入
	ReduceOutput string   // Reduce任务的输出
}
  • 告知Coordinator自己已经上线:
// Worker告知Coordinator自己上线了
func (c *Coordinator) WorkerOnline(args *WorkerArgs, reply *WorkerReply) error {
	mu.Lock()
	if c.WorkerNum == -1 {
		c.WorkerNum = 0
	}
	c.WorkerNum += 1
	mu.Unlock()
	return nil
}

这里暂时比较简单,后续需要进行处理,以进行异常处理

  • 向Coordinator请求任务:
// Worker向Coordinator请求任务
func (c *Coordinator) AsssignTask(args *WorkerArgs, reply *WorkerReply) error {

	mu.Lock()

	// 首先查看map任务是否已经全部完成,如果全部完成了就去完成Reduce任务,如果也全部完成了就发送Worker可以退出的消息
	// 判断方式:通过完成链表的节点数量与初始化时侯计算的数量是否相同

	if c.MapListComplete.Len() != c.MapTaskNum {

		// 分配map任务

		if c.MapListReady.Len() == 0 {

			// 没有没开始的Map任务
			reply.TaskType = "waiting"

		} else {

			// 将一个未完成的任务从未开始的链表中取出,插入到正在进行的链表里面
			e := c.MapListReady.Front()
			c.MapListReady.Remove(e)
			c.MapListRunning.PushBack(e)

			// 构建返回消息,告知Worker这个任务的信息
			reply.TaskType = "map"
			value := e.Value.(MapTaskInformation)
			reply.Id = value.Id
			reply.MapInput = value.OriginFileName
			reply.MapOutput = value.IntermediateFileName
		}
	} else if c.ReduceListComplete.Len() != c.ReduceTaskNum {

		// 分配reduce任务

		if c.ReduceListReady.Len() == 0 {
			// 没有没开始的Reduce任务
			reply.TaskType = "waiting"

		} else {

			// 将一个未完成的任务从未开始的链表中取出,插入到正在进行的链表里面
			e := c.ReduceListReady.Front()
			c.ReduceListReady.Remove(e)
			c.ReduceListRunning.PushBack(e)

			// 构建返回消息,告知Worker这个任务的信息
			reply.TaskType = "reduce"
			value := e.Value.(ReduceTaskInformation)
			reply.Id = value.Id
			reply.ReduceInput = value.IntermediateFileName
			reply.ReduceOutput = value.OutputFileName

		}
	} else {

		//告知Worker已经没有任务了,可以退出了
		reply.TaskType = "finish"
	}

	mu.Unlock()

	return nil
}

收到请求后操作全局链表,构建消息并返回即可

  • 向Coordinator返回自己的任务已经完成
// Worker告知Coordinator刚才分配的任务已经完成
func (c *Coordinator) TaskFinish(args *WorkerArgs, reply *WorkerReply) error {

	mu.Lock()

	// 将节点从正在进行的链表中取出,插入到已经完成的链表中
	if args.TaskType == "map" {

		// 操作节点
		e := c.UniqueIdSlice[args.Taskid]
		c.MapListRunning.Remove(e)
		c.MapListComplete.PushBack(e)

		// 如果是Map任务,需要将产生的nReduce个中间文件分配给Reduce节点
		for _, file := range e.Value.(MapTaskInformation).IntermediateFileName {

			// 计算是哪个Reduce节点
			reduceTaskNum, err := strconv.Atoi(strings.Split(file, "-")[2])
			if err != nil {
				log.Fatalf("cannot parseInt %v", file)
			}

			// 将产生的nReduce个中间文件分配给Reduce节点(需要重新构建节点)
			value := c.UniqueIdSlice[reduceTaskNum].Value
			tempSlice := append(value.(ReduceTaskInformation).IntermediateFileName, file)
			c.UniqueIdSlice[reduceTaskNum].Value = ReduceTaskInformation{
				Id:                   value.(ReduceTaskInformation).Id,
				IntermediateFileName: tempSlice,
				OutputFileName:       value.(ReduceTaskInformation).OutputFileName,
			}
		}
	} else if args.TaskType == "reduce" {

		// 操作节点
		e := c.ReduceListRunning.Remove(c.UniqueIdSlice[args.Taskid])
		c.ReduceListComplete.PushBack(e)
	}
	mu.Unlock()
	return nil
}

对于Map任务需要传递Map输出,Reduce输入的文件信息,将结构体填充完整

  • 向Coordinator返回自己退出的消息
// Worker告知Coordinator自己退出了
func (c *Coordinator) WorkerFinish(args *WorkerArgs, reply *WorkerReply) error {

	mu.Lock()

	// 退出时将Coordinator内部存储的Worker数量-1
	c.WorkerNum -= 1

	mu.Unlock()

	return nil
}

将全局的WorkerNum减去1,后续需要进行处理。

经测试,除异常检测完已经都能顺利pass,多次运行的结果也完全相同

有一个小问题是它的脚本给的超时时间不够,调大一些后才能顺利运行,后续可以进行更改。

异常处理

原文与异常处理相关的部分:

The coordinator should notice if a worker hasn’t completed its task in a reasonable amount of time (for this lab, use ten seconds), and give the same task to a different worker.

The best you can do is have the coordinator wait for some amount of time, and then give up and re-issue the task to a different worker. For this lab, have the coordinator wait for ten seconds; after that the coordinator should assume the worker has died (of course, it might not have).

To test crash recovery, you can use the mrapps/crash.go application plugin. It randomly exits in the Map and Reduce functions.

可以先查看crash.go,看看是如何模拟线程崩溃的:

func maybeCrash() {
	max := big.NewInt(1000)
	rr, _ := crand.Int(crand.Reader, max)
	if rr.Int64() < 330 {
		// crash!
		os.Exit(1)
	} else if rr.Int64() < 660 {
		// delay for a while.
		maxms := big.NewInt(10 * 1000)
		ms, _ := crand.Int(crand.Reader, maxms)
		time.Sleep(time.Duration(ms.Int64()) * time.Millisecond)
	}
}

阅读代码,可以发现这个设置是有1/3的概率直接崩溃掉,有2/3的概率线程睡眠不到10s,模拟的环境还是比较简单的。

实现:

Worker部分:

Worker上线后,由Coordinator为其分配一个ID,随后在Worker的每一个rpc请求中都带有这个ID

WorkerID := reply.WorkerID

Worker上线后每5秒发送心跳信号给Coordinator,表明自己在线

// 心跳信号
go func() {
	for {
		args := WorkerArgs{TaskType: "None"}
		args.Id = WorkerID
		reply := WorkerReply{TaskType: "None"}
		time.Sleep(time.Second * 5)
		call("Coordinator.WorkerAlive", &args, &reply)
	}
}()

Coordinator部分:

维护一个切片结构体,索引表示Worker的ID,结构体内部包括任务ID和上一次心跳信号的时间

type HeartBeat struct {
	WorkID int
	Time   int64
}
var WorkerList []HeartBeat

接收到Worker上线的RPC后,记录当前的时间戳,记录任务ID为-1,即表示这个索引ID已经分配给Worker了

// 分配任务ID并记录时间
WorkerList = append(WorkerList, HeartBeat{
	WorkID: -1,
	Time:   time.Now().Unix(),
})
reply.WorkerID = len(WorkerList)

接收心跳信号后更新切片结构体

// Coordinator接收心跳信号
func (c *Coordinator) WorkerAlive(args *WorkerArgs, reply *WorkerReply) error {
	mu.Lock()
	WorkerList[args.Id-1].Time = time.Now().Unix()
	fmt.Printf("接收到%d心跳信号\n", args.Id-1)
	mu.Unlock()
	return nil
}

分配任务后在切片结构体内更新任务ID信息

WorkerList[args.Id-1].WorkID = value.Id

开启协程每10秒检查切片结构体的时间戳,如果时间戳与当前时间间隔大于10秒,将任务的状态更改为未完成,重新分配。

// Worker信息存储
WorkerList = make([]HeartBeat, 0)
// 每间隔10秒进行验证
go func() {
	for {
		time.Sleep(10 * time.Second)
		mu.Lock()
		for i := 0; i < len(WorkerList); i++ {
			if WorkerList[i].WorkID != -1 && time.Now().Unix()-WorkerList[i].Time > 10 {
				fmt.Printf("%d心跳信号过期\n", i)
				e2 := *(c.UniqueIdSlice[WorkerList[i].WorkID])

				// 这里不太懂为什么要这样写
				if WorkerList[i].WorkID < c.MapTaskNum {
					c.MapListRunning.Remove(&e2)
					c.MapListReady.PushBack(e2.Value)
				} else {
					c.ReduceListRunning.Remove(&e2)
					c.ReduceListReady.PushBack(e2.Value)
				}

				c.WorkerNum -= 1
				WorkerList[i].WorkID = -1
			}
		}
		mu.Unlock()
	}
}()

结束

至此,可以单独通过全部的test,但是仍然存在一些问题

  • 代码可读性不高,不够规范,自己都看不太明白
  • 第一个wc的test和第二个index的test结合在一起通不过,但是可以单独通过两个test
  • 最后异常检测的时候会有worker退不出去
  • 代码运行时间整体比较长,不能满足脚本的运行时间
  • 加锁的地方考虑的比较少,有点过于简单粗暴了

总之基本功能已经没有什么问题了,以后有时间再进行重构。


MIT-6.824 Distributed Systems-Lab 1 MapReduce
https://zhangzhao219.github.io/2022/12/15/6.824/Distributed-Systems-MIT-6.824-Lab-1/
作者
Zhang Zhao
发布于
2022年12月15日
许可协议