Skip to content

Latest commit

 

History

History
778 lines (670 loc) · 26.5 KB

Lab1_MapReduce.md

File metadata and controls

778 lines (670 loc) · 26.5 KB
title date category tags
MIT6.5840(6.824) Lab1: MapReduce
2023-12-23 05:25:18 -0800
CS课程笔记
MIT6.5840(6.824) 2023
Lab笔记
分布式系统
Go

1 实验介绍

本次实验是实现一个简易版本的MapReduce编程框架,官方文档在这里:lab1文档, 强烈建议先阅读MapReduce论文, 难度主要体现在设计上, 实际的代码实现倒是相对简单, 这也得益于go的语言特性, 比CMU15445使用Cpp写代码方便多了。实验需要实现的是CoordinatorWorker的设计, 具体实现细节十分自由(无从下手)

我的代码实现点这里: https://github.com/GFX9/MIT6.5840/tree/lab1

2 既有框架解读

解读现有的框架设计是第一步。

2.1 代码解读

  1. 阅读src/main/mrcoordinator.go可知: 服务进程通过MakeCoordinator启动了一个Coordinator c, c.server()中启用了一个协程用于接受RPC调用:go http.Serve(l, nil), 需要注意的是, 在 Go 的 net/http 包中, 使用 http.Serve(l, nil) 启动 HTTP 服务器以侦听和处理请求时,服务器会为每个进来的请求自动启动一个新的协程。这意味着每个 RPC 调用都是在其自己的独立协程中被处理的,允许并发处理多个请求。因此, 我们的设计可能需要使用锁等同步原语实现共享资源的保护, 同时Coordinator不会主动与Worker通信(除非自己额外实现), 只能通过WorkerRPC通信来完成任务。同时, 当所有任务完成时, Done方法将会返回false, 从而将Coordinator关闭。
  2. 阅读src/main/mrworker.go 可以得知,mrworker.go仅仅通过Worker函数来运行, 因此Worker函数需要完成请求任务、执行任务、报告任务执行状态等多种任务。因此可以猜测,Worker需要再这个函数里不断地轮训Coordinator,根据Coordinator的不同回复驱使当前Worker完成各种任务。

2.2 任务误区解读

  1. MapReduce任务、CoordinatorWorker的关系如何? 这些任务(文中此后称为Task)与Worker是什么关系呢? 是否存在对应关系? 这些对应关系需要记录吗? 通常, 在常见的主从关系中, 主节点需要记录从节点的信息,例如线程id等表名身份的信息, 但在我们的MapReduce中却没有这样的必要, 因为Worker节点是可以扩容的, 而CoordinatorWorker之间只有传递Task相关信息的需求, 因此Coordinator只需要记录Task任务的状态即可, Task分配给Worker后执行可能成功或失败, 因此Coordinator还需要维护任务执行的时间信息, 以便在超时后重新分配任务。因此,MapReduce任务、CoordinatorWorker的关系可以参考下图:

    MapReduce_relation

    Worker可能在不同时间执行不同的Task, 也可能什么也不做(初始状态或等候所有Map Task完成时可能会闲置)

  2. MapReduce任务有多少个? 如何分配?

    • Map Task实际上在此实验中被简化了, 每个Map Task的任务就是处理一个.txt文件, 因此Map Task的数量实际上就是.txt文件的数量。 因此, 每个.txt文件对应的Map Task需要Coordinator记录其执行情况并追踪。
    • Reduce Task的数量是nReduce。由于Map Task会将文件的内容分割为指定的nReduce份, 每一份应当由序号标明, 拥有这样的序号的多个Map Task的输出汇总起来就是对应的Reduce Task的输入。
  3. 中间文件的格式是怎么样的? Reduce任务如何选择中间文件作为输入? 因为Map Task分割采用的是统一的哈希函数ihash, 所以相同的key一定会被Map Task输出到格式相同的中间文件上。例如在wc任务中, Map Task 1Map Task 2输入文件中都存在hello这个词, Map Task 1中所有的hello会被输出到mr-out-1-5这个中间文件, 1代表Map Task序号, 5代表被哈希值取模的结果。那么,Map Task 2中所有的hello会被输出到mr-out-2-5这个中间文件。那么Reduce Task 5读取的就是形如mr-out-*-5这样的文件。

3 设计与实现

3.1 RPC设计

3.1.1. 消息类型

,通信时首先需要确定这个消息是什么类型, 通过前述分析可知, 通信的信息类型包括:

  • Worker请求任务
  • Coordinator分配ReduceMap任务
  • Worker报告ReduceMap任务的执行情况(成功或失败)
  • Coordinator告知Worker休眠(暂时没有任务需要执行)
  • Coordinator告知Worker退出(所有任务执行成功)

每一种消息类型会需要附带额外的信息, 例如Coordinator分配任务需要告知任务的ID, Map任务还需要告知NReduce,和输入文件名。 综上考虑, 消息类型的定义如下(SendReply是从Worker视角出发的):

const (
    AskForTask      MsgType = iota // `Worker`请求任务
    MapTaskAlloc                   // `Coordinator`分配`Map`任务
    ReduceTaskAlloc                // `Coordinator`分配`Reduce`任务
    MapSuccess                     // `Worker`报告`Map`任务的执行成功
    MapFailed                      // `Worker`报告`Map`任务的执行失败
    ReduceSuccess                  // `Worker`报告`Reduce`任务的执行成功
    ReduceFailed                   //`Worker`报告`Reduce`任务的执行失败
    Shutdown                       // `Coordinator`告知`Worker`退出(所有任务执行成功)
    Wait                           //`Coordinator`告知`Worker`休眠(暂时没有任务需要执行)
)

type MessageSend struct {
    MsgType MsgType
    TaskID  int // `Worker`回复的消息类型如MapSuccess等需要使用
}

type MessageReply struct {
    MsgType  MsgType
    NReduce  int    // MapTaskAlloc需要告诉Map Task 切分的数量
    TaskID   int    // 任务Id用于选取输入文件
    TaskName string // MapSuccess专用: 告知输入.txt文件的名字
}

3.1.2 通信函数设计

在我的设计中,Worker只需要有2个动作:

  • Coordinator请求Task
  • Coordinator报告之前的Task的执行情况

因此, worker.go中通信函数应该是下面的样子:

func CallForReportStatus(succesType MsgType, taskID int) error {
    // 报告Task执行情况
    // declare an argument structure.
    args := MessageSend{
        MsgType: succesType,
        TaskID:  taskID,
    }
    
    err := call("Coordinator.NoticeResult", &args, nil)

    return err
}
func CallForTask() *MessageReply {
    // 请求一个Task
    // declare an argument structure.
    args := MessageSend{
        MsgType: AskForTask,
    }

    // declare a reply structure.
    reply := MessageReply{}

    // send the RPC request, wait for the reply.
    err := call("Coordinator.AskForTask", &args, &reply)
    if err == nil {
        // fmt.Printf("TaskName %v, NReduce %v, taskID %v\n", reply.TaskName, reply.NReduce, reply.TaskID)
        return &reply
    } else {
        // log.Println(err.Error())
        return nil
    }
}

coordinator.go有相应的处理函数:

func (c *Coordinator) AskForTask(req *MessageSend, reply *MessageReply) error {}
func (c *Coordinator) NoticeResult(req *MessageSend, reply *MessageReply) error {}

这些处理函数则需要进一步的设计。

3.2 Worker设计

3.2.1 Worker主函数设计

由之前的分析可以看出,Woker所做的内容就是不断的请求任务、执行任务和回复任务执行情况,因此,可以很容易地写出Worker函数:

// main/mrworker.go calls this function.
func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {

	// Your worker implementation here.

	for {
		// 循环请求
		replyMsg := CallForTask()

		switch replyMsg.MsgType {
		case MapTaskAlloc:
			err := HandleMapTask(replyMsg, mapf)
			if err == nil {
				_ = CallForReportStatus(MapSuccess, replyMsg.TaskID)
			} else {
				// log.Println("Worker: Map Task failed")
				_ = CallForReportStatus(MapFailed, replyMsg.TaskID)
			}
		case ReduceTaskAlloc:
			err := HandleReduceTask(replyMsg, reducef)
			if err == nil {
				_ = CallForReportStatus(ReduceSuccess, replyMsg.TaskID)
			} else {
				// log.Println("Worker: Map Task failed")
				_ = CallForReportStatus(ReduceFailed, replyMsg.TaskID)
			}
		case Wait:
			time.Sleep(time.Second * 10)
		case Shutdown:
			os.Exit(0)
		}
		time.Sleep(time.Second)
	}
}

3.2.2 Map Task执行函数

HandleMapTask函数是执行具体的MapTask, 这样部分很简单, 可以从mrsequential.go中偷代码:

func HandleMapTask(reply *MessageReply, mapf func(string, string) []KeyValue) error {
	file, err := os.Open(reply.TaskName)
	if err != nil {
		return err
	}
	defer file.Close()

	content, err := io.ReadAll(file)
	if err != nil {
		return err
	}

	// 进行mapf
	kva := mapf(reply.TaskName, string(content))
	sort.Sort(ByKey(kva))

	oname_prefix := "mr-out-" + strconv.Itoa(reply.TaskID) + "-"

	key_group := map[string][]string{}
	for _, kv := range kva {
		key_group[kv.Key] = append(key_group[kv.Key], kv.Value)
	}

	// 先清理可能存在的垃圾
	// TODO: 原子重命名的方法
	_ = DelFileByMapId(reply.TaskID, "./")

	for key, values := range key_group {
		redId := ihash(key)
		oname := oname_prefix + strconv.Itoa(redId%reply.NReduce)
		var ofile *os.File
		if _, err := os.Stat(oname); os.IsNotExist(err) {
			// 文件夹不存在
			ofile, _ = os.Create(oname)
		} else {
			ofile, _ = os.OpenFile(oname, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
		}
		enc := json.NewEncoder(ofile)
		for _, value := range values {
			err := enc.Encode(&KeyValue{Key: key, Value: value})
			if err != nil {
				ofile.Close()
				return err
			}
		}
		ofile.Close()
	}
	return nil
}

虽然偷了很多代码, 但是有家店需要注意, 因为之前的Worker可能写入了一部分数据到中间文件后失败的情况, 之后Coordinator重新分配任务时, 文件是可能存在脏数据的, 因此需要先执行清理:

_ = DelFileByMapId(reply.TaskID, "./")
func DelFileByMapId(targetNumber int, path string) error {
	// 创建正则表达式,X 是可变的指定数字
	pattern := fmt.Sprintf(`^mr-out-%d-\d+$`, targetNumber)
	regex, err := regexp.Compile(pattern)
	if err != nil {
		return err
	}

	// 读取当前目录中的文件
	files, err := os.ReadDir(path)
	if err != nil {
		return err
	}

	// 遍历文件,查找匹配的文件
	for _, file := range files {
		if file.IsDir() {
			continue // 跳过目录
		}
		fileName := file.Name()
		if regex.MatchString(fileName) {
			// 匹配到了文件,删除它
			filePath := filepath.Join(path, file.Name())
			err := os.Remove(filePath)
			if err != nil {
				return err
			}
		}
	}
	return nil
}

DelFileByMapId函数删除特定Map Task的输出文件, 但这样的执行存在一定隐患:

  • 首先是Coordinator只能重新分配一个Worker执行Coordinator认为死掉的任务, 这一点可以通过加锁和记录时间戳完成, 后续会讲到
  • 其次, 如果之前死掉的Worker又活了,其可能和现在的Worker的输出又有交叉了

因此, 目前的实现是存在一定漏洞的, 有以下的改进方案:

  1. 参考官方的提示, 先为当前的中间文件使用临时名, 完成操作后再进行原子重命名
  2. 通过加文件锁的方式保护文件

Ps: 后续有时间再改代码吧

3.2.3 Reduce Task执行函数

Reduce Task手机对应序号的中间文件, 汇总后应用指定的reduce函数,实现也比较简单:

func HandleReduceTask(reply *MessageReply, reducef func(string, []string) string) error {
	key_id := reply.TaskID

	k_vs := map[string][]string{}

	fileList, err := ReadSpecificFile(key_id, "./")

	if err != nil {
		return err
	}

	// 整理所有的中间文件
	for _, file := range fileList {
		dec := json.NewDecoder(file)
		for {
			var kv KeyValue
			if err := dec.Decode(&kv); err != nil {
				break
			}
			k_vs[kv.Key] = append(k_vs[kv.Key], kv.Value)
		}
		file.Close()
	}

	// 获取所有的键并排序
	var keys []string
	for k := range k_vs {
		keys = append(keys, k)
	}
	sort.Strings(keys)

	oname := "mr-out-" + strconv.Itoa(reply.TaskID)
	ofile, err := os.Create(oname)
	if err != nil {
		return err
	}
	defer ofile.Close()

	for _, key := range keys {
		output := reducef(key, k_vs[key])
		_, err := fmt.Fprintf(ofile, "%v %v\n", key, output)
		if err != nil {
			return err
		}
	}

	DelFileByReduceId(reply.TaskID, "./")

	return nil
}

需要注意的是, 我收集文件内容是使用了map, 而迭代map时, key的顺序是随机的, 因此需要先进行对key排序的操作:

// 获取所有的键并排序
	var keys []string
	for k := range k_vs {
		keys = append(keys, k)
	}
	sort.Strings(keys)

其实这里也存在漏洞:同样就是死了的Worker突然复活了怎么办的问题,相比Map Task来说, 这里虽然相对不容易出错, 因为这里对多个中间文件只存在读取而不存在写入, 将内容读取到内存中不会有冲突的。出错只可能在将数据在写入到指定的文件时, os.Create(oname)也会存在竞争条件:因为网络等问题,Coordinator启动了多个Worker, 多个Worker同时运行,并且都尝试创建同一个文件名oname, 假设一个Worker先创建了oname并写入了一部分数据,当另一个Worker再次调用os.Create(oname)时,之前的数据将会被清空。这意味着第一个Worker在接下来的写操作中不会出现错误,但它写入的部分数据会丢失,因为第二个Worker已经截断了文件。

因此, 目前的实现是也存在一定漏洞的, 有以下的改进方案:

  1. 参考官方的提示, 先为当前的中间文件使用临时名, 完成操作后再进行原子重命名
  2. 通过加文件锁的方式保护文件

3.3 Coordinator设计

3.3.1 TaskInfo设计

首先需要考虑的是, 如何维护Task的执行信息, Task执行状态包括了: 未执行、执行者、执行失败、执行完成。 这里有一个很重要的问题需要考虑, 超时的任务时什么状态呢?因为在我的设计中,CoordinatorWorker是通过RPC来驱动彼此运行的, 当然你也可以启动一个goroutine间隔地检查是否超时, 但为了使设计更简单, 我们可以这样设计检查超时的方案:

  1. 为每个Worker分配Task时需要记录Task被分配的时间戳, 并将其状态置为running
  2. 为每个Worker分配Task, 遍历存储TaskInfo的数据结构, 检查每一个状态为runningTask的时间戳是否与当前时间戳差距大于10s, 如果是, 则代表这个Task超时了, 立即将它分配给当前请求的Worker, 并更新其时间戳
  3. 如果导致Task超时的老旧的Woker之后又完成了, 结果也就是这个Task返回了多次执行成功的报告而已, 可忽略

PS: Worker执行失败有2种, 一种是Worker没有崩溃但发现了error, 这时Worker会将错误报告给Coordinator, Coordinator会将其状态设置为failed, 另一种情况是Worker崩溃了, 连通知都做不到, 这就以超时体现出来, 处理好超时即可

因此, 我如下设计TaskInfo的数据结构:

type taskStatus int

// Task 状态
const (
	idle     taskStatus = iota // 闲置未分配
	running                    // 正在运行
	finished                   // 完成
	failed                     //失败
)

// Map Task 执行状态
type MapTaskInfo struct {
	TaskId    int        // Task 序号
	Status    taskStatus // 执行状态
	StartTime int64      // 开始执行时间戳
}

// Reduce Task 执行状态
type ReduceTaskInfo struct {
	// ReduceTask的 序号 由数组下标决定, 不进行额外存储
	Status    taskStatus // 执行状态
	StartTime int64      // 开始执行时间戳
}

type Coordinator struct {
	// Your definitions here.
	NReduce     int                     // the number of reduce tasks to use.
	MapTasks    map[string]*MapTaskInfo //MapTaskInfo
	mu          sync.Mutex              // 一把大锁保平安
	ReduceTasks []*ReduceTaskInfo       // ReduceTaskInfo
}

添加TaskInfo的初始化方法, 并在MakeCoordinator中调用:

func (c *Coordinator) initTask(files []string) {
	for idx, fileName := range files {
		c.MapTasks[fileName] = &MapTaskInfo{
			TaskId: idx,
			Status: idle,
		}
	}
	for idx := range c.ReduceTasks {
		c.ReduceTasks[idx] = &ReduceTaskInfo{
			Status: idle,
		}
	}
}

func MakeCoordinator(files []string, nReduce int) *Coordinator {
	c := Coordinator{
		NReduce:     nReduce,
		MapTasks:    make(map[string]*MapTaskInfo),
		ReduceTasks: make([]*ReduceTaskInfo, nReduce),
	}

	// Your code here.
	// 由于每一个文件名就是一个map task ,需要初始化任务状态
	c.initTask(files)

	c.server()
	return &c
}

3.3.2 RPC 响应函数-AskForTask

这部分算是较为复杂的, 其逻辑如下:

  1. 如果有闲置的任务(idle)和之前执行失败(failed)的Map Task, 选择这个任务进行分配
  2. 如果检查到有超时的任务Map Task, 选择这个任务进行分配
  3. 如果以上的Map Task均不存在, 但Map Task又没有全部执行完成, 告知Worker先等待
  4. Map Task全部执行完成的情况下, 按照12相同的逻辑进行Reduce Task的分配
  5. 所有的Task都执行完成了, 告知Worker退出

因此, AskForTask代码如下:

func (c *Coordinator) AskForTask(req *MessageSend, reply *MessageReply) error {
	if req.MsgType != AskForTask {
		return BadMsgType
	}
	// 选择一个任务返回给worker
	c.mu.Lock()
	defer c.mu.Unlock()

	count_map_success := 0
	for fileName, taskinfo := range c.MapTasks {
		alloc := false

		if taskinfo.Status == idle || taskinfo.Status == failed {
			// 选择闲置或者失败的任务
			alloc = true
		} else if taskinfo.Status == running {
			// 判断其是否超时, 超时则重新派发
			curTime := time.Now().Unix()
			if curTime-taskinfo.StartTime > 10 {
				taskinfo.StartTime = curTime
				alloc = true
			}
		} else {
			count_map_success++
		}

		if alloc {
			// 将未分配的任务和已经失败的任务分配给这个worker
			reply.MsgType = MapTaskAlloc
			reply.TaskName = fileName
			reply.NReduce = c.NReduce
			reply.TaskID = taskinfo.TaskId

			// log.Printf("coordinator: apply Map Task: taskID = %v\n", reply.TaskID)

			// 修改状态信息
			taskinfo.Status = running
			taskinfo.StartTime = time.Now().Unix()
			return nil
		}
	}

	if count_map_success < len(c.MapTasks) {
		// map任务没有可以分配的, 但都还未完成
		reply.MsgType = Wait
		return nil
	}

	count_reduce_success := 0
	// 运行到这里说明map任务都已经完成
	for idx, taskinfo := range c.ReduceTasks {
		alloc := false
		if taskinfo.Status == idle || taskinfo.Status == failed {
			alloc = true
		} else if taskinfo.Status == running {
			// 判断其是否超时, 超时则重新派发
			curTime := time.Now().Unix()
			if curTime-taskinfo.StartTime > 10 {
				taskinfo.StartTime = curTime
				alloc = true
			}
		} else {
			count_reduce_success++
		}

		if alloc {
			// 分配给其一个Reduce任务
			reply.MsgType = ReduceTaskAlloc
			reply.TaskID = idx

			// log.Printf("coordinator: apply Reduce Task: taskID = %v\n", reply.TaskID)

			taskinfo.Status = running
			taskinfo.StartTime = time.Now().Unix()
			return nil
		}
	}

	if count_reduce_success < len(c.ReduceTasks) {
		// reduce任务没有可以分配的, 但都还未完成
		reply.MsgType = Wait
		return nil
	}

	// 运行到这里说明所有任务都已经完成
	reply.MsgType = Shutdown

	return nil
}

在这里, 我对数据的保护是一把大锁保平安, 这其实可以优化的

3.3.3 RPC 响应函数-NoticeResult

这个函数就简单很多了, 只需要改变对应TaskInfo的状态即可

func (c *Coordinator) NoticeResult(req *MessageSend, reply *MessageReply) error {
	c.mu.Lock()
	defer c.mu.Unlock()
	if req.MsgType == MapSuccess {
		for _, v := range c.MapTasks {
			if v.TaskId == req.TaskID {
				v.Status = finished
				// log.Printf("coordinator: map task%v finished\n", v.TaskId)
				break
			}
		}
	} else if req.MsgType == ReduceSuccess {
		c.ReduceTasks[req.TaskID].Status = finished
		// log.Printf("coordinator: reduce task%v finished\n", req.TaskID)
	} else if req.MsgType == MapFailed {
		for _, v := range c.MapTasks {
			if v.TaskId == req.TaskID {
				v.Status = failed
				// log.Printf("coordinator: map task%v failed\n", v.TaskId)
				break
			}
		}
	} else if req.MsgType == ReduceFailed {
		c.ReduceTasks[req.TaskID].Status = failed
		// log.Printf("coordinator: reduce task%v failed\n", req.TaskID)
	}
	return nil
}

3.3.4 Done方法

Done方法是最简单的, 遍历TaskInfo的数据结构, 如果全部完成则返回True,

func (c *Coordinator) Done() bool {
	// Your code here.
	// 先确认mapTask完成
	for _, taskinfo := range c.MapTasks {
		if taskinfo.Status != finished {
			return false
		}
	}

	// fmt.Println("Coordinator: All map task finished")

	// 再确认Reduce Task 完成
	for _, taskinfo := range c.ReduceTasks {
		if taskinfo.Status != finished {
			return false
		}
	}

	// fmt.Println("Coordinator: All reduce task finished")

	// time.Sleep(time.Second * 5)

	return true
}

有一个小细节, time.Sleep(time.Second * 5)是为了让Coordinator延迟关闭, 这样可以留出时间告知Worker退出, 也可以直接注释掉它, 让测试跑得更快

4 测试和优化

4.1 原实现测试

运行测试:

$ time bash test-mr.sh

结果如下图, 耗时3m3s

MapReduce-Test

4.2 优化

4.2.1 原子重命名

按照官方提示, 使用原子重命名避免竞争, 修改HandleMapTask函数:

func HandleMapTask(reply *MessageReply, mapf func(string, string) []KeyValue) error {
	file, err := os.Open(reply.TaskName)
	if err != nil {
		return err
	}
	defer file.Close()

	content, err := io.ReadAll(file)
	if err != nil {
		return err
	}

	kva := mapf(reply.TaskName, string(content))
	sort.Sort(ByKey(kva))

	tempFiles := make([]*os.File, reply.NReduce)
	encoders := make([]*json.Encoder, reply.NReduce)

	for _, kv := range kva {
		redId := ihash(kv.Key) % reply.NReduce
		if encoders[redId] == nil {
			tempFile, err := ioutil.TempFile("", fmt.Sprintf("mr-map-tmp-%d", redId))
			if err != nil {
				return err
			}
			defer tempFile.Close()
			tempFiles[redId] = tempFile
			encoders[redId] = json.NewEncoder(tempFile)
		}
		err := encoders[redId].Encode(&kv)
		if err != nil {
			return err
		}
	}

	for i, file := range tempFiles {
		if file != nil {
			fileName := file.Name()
			file.Close()
			newName := fmt.Sprintf("mr-out-%d-%d", reply.TaskID, i)
			if err := os.Rename(fileName, newName); err != nil {
				return err
			}
		}
	}

	return nil
}

运行测试:

$ time bash test-mr.sh

结果如下图, 耗时2m52s, 原子重命名节省了原实现中的清理耗费的时间, 节约了差不多10s的样子

MapReduce-Test-2

4.2.2 锁细化

我们可以发现, 在Worker请求任务时, Map Task是需要先全部执行成功的, 因此我们可以增加一个字段记录Map Task是否全部完成, 同时为MapTaskInfoReduceTaskInfo分别实现设计一个锁来取代原来的大锁, 代码如下:

type Coordinator struct {
	// Your definitions here.
	NReduce       int                     // the number of reduce tasks to use.
	MapTasks      map[string]*MapTaskInfo //MapTaskInfo
	MapSuccess    bool                    // Map Task 是否全部完成
	muMap         sync.Mutex              // Map 锁, 保护 MapTasks
	ReduceTasks   []*ReduceTaskInfo       // ReduceTaskInfo
	ReduceSuccess bool                    // Reduce Task 是否全部完成
	muReduce      sync.Mutex              // Reduce 锁, 保护 ReduceTasks
}

func (c *Coordinator) AskForTask(req *MessageSend, reply *MessageReply) error {
	if req.MsgType != AskForTask {
		return BadMsgType
	}
	if !c.MapSuccess {
		// 选择一个 Map Task 返回给worker

		c.muMap.Lock()

		count_map_success := 0
		for fileName, taskinfo := range c.MapTasks {
			alloc := false

			if taskinfo.Status == idle || taskinfo.Status == failed {
				// 选择闲置或者失败的任务
				alloc = true
			} else if taskinfo.Status == running {
				// 判断其是否超时, 超时则重新派发
				curTime := time.Now().Unix()
				if curTime-taskinfo.StartTime > 10 {
					taskinfo.StartTime = curTime
					alloc = true
				}
			} else {
				count_map_success++
			}

			if alloc {
				// 将未分配的任务和已经失败的任务分配给这个worker
				reply.MsgType = MapTaskAlloc
				reply.TaskName = fileName
				reply.NReduce = c.NReduce
				reply.TaskID = taskinfo.TaskId

				// log.Printf("coordinator: apply Map Task: taskID = %v\n", reply.TaskID)

				// 修改状态信息
				taskinfo.Status = running
				taskinfo.StartTime = time.Now().Unix()
				c.muMap.Unlock()
				return nil
			}
		}

		c.muMap.Unlock()

		if count_map_success < len(c.MapTasks) {
			// map任务没有可以分配的, 但都还未完成
			reply.MsgType = Wait
			return nil
		} else {
			c.MapSuccess = true
		}
	}

	if !c.ReduceSuccess {
		// 选择一个 Reduce Task 返回给worker
		c.muReduce.Lock()

		count_reduce_success := 0
		// 运行到这里说明map任务都已经完成
		for idx, taskinfo := range c.ReduceTasks {
			alloc := false
			if taskinfo.Status == idle || taskinfo.Status == failed {
				alloc = true
			} else if taskinfo.Status == running {
				// 判断其是否超时, 超时则重新派发
				curTime := time.Now().Unix()
				if curTime-taskinfo.StartTime > 10 {
					taskinfo.StartTime = curTime
					alloc = true
				}
			} else {
				count_reduce_success++
			}

			if alloc {
				// 分配给其一个Reduce任务
				reply.MsgType = ReduceTaskAlloc
				reply.TaskID = idx

				// log.Printf("coordinator: apply Reduce Task: taskID = %v\n", reply.TaskID)

				taskinfo.Status = running
				taskinfo.StartTime = time.Now().Unix()

				c.muReduce.Unlock()
				return nil
			}
		}

		c.muReduce.Unlock()

		if count_reduce_success < len(c.ReduceTasks) {
			// reduce任务没有可以分配的, 但都还未完成
			reply.MsgType = Wait
			return nil
		} else {
			c.ReduceSuccess = true
		}
	}

	// 运行到这里说明所有任务都已经完成
	reply.MsgType = Shutdown

	return nil
}

运行测试:

$ time bash test-mr.sh

结果如下图, 耗时2m39s, 原子重命名节省了原实现中的清理耗费的时间, 又节约了差不多13s的样子 另外别忘了NoticeResult也要相应地修改, 由于比较简单就不列出来了, 可以直接看仓库代码。

MapReduce-Test-3