Lab1: MapReduce(个人纪录版)
(烂完了,深刻感觉到自己有多菜,各个方面)
最初的版本是全部自己写的,测试的时候会出现时而成功,时而失败的情况,重写两遍的我已经不想再看两遍代码了,后面让AI帮忙分析,然后一点点的修改,最后出问题的地方是在Call4Map()
,忘记更新那个状态了。
早些时候做的设计重复且没有用(除了对MapReduce的理解问题,还有就是刚接触的时候感觉逻辑复杂,一度不想写,断断续续写了好久,同时对go也不是很熟悉),后面改了好几次才有git上第一次提交的版本,第一次编译的时候错误很多,改了很久。
总体来说,确实没那么复杂,懂了才敢这么说。下面就不对代码做过多分析了, 仅仅说一下自己的思路,方便后面的复习。
关于怎么通信的没有关注,似乎是http.Serve
吧,server`函数很重要,后面看看。
Coorderinator
作为一个协调器,需要考虑到维护一些状态,主要就是下面几个功能:
- 分配Map任务
- 对已经完成的Map做标记
- 分配Reduce任务
- 对已经完成的Reduce做标记
剩下的都是完成的事情。其中需要注意的是并发的情况,共享的数据是不能同时访问的,go的锁机制非常舒服,几乎不用怎么考虑锁。
下面这几个变量很重要,注释有说明。这里最初的设计非常失败,这里都是后面修改的,不然看不了。
nReduce int //reduce的任务数量,最终的reduce至少要分配几次//mapMapTaskNums map[string]int //每一个文件都只能对应一个TaskNummapFinNum int //已经完成的Map任务数量Files map[string]int // 0: unFinished. 1:allocated. 2:completed.每个任务的状态MapRime map[string]time.Time //任务开始的时间,超时重新分配//reducereduceFin []int // 0: unFinished. 1:allocated. 2:completed.每个任务的状态reduceFinNum int //已经完成的Reduce任务数量ReduceRime []time.Time //任务开始的时间,超时重新分配
下面是完整代码,关于时间的我直接AI辅助了,之前没有接触过,AI写例子,我来抄
package mrimport ("log""net""net/http""net/rpc""os""sync""time"
)type Coordinator struct {// Your definitions here.mu sync.MutexnReduce int//mapMapTaskNums map[string]int // for fileNamemapFinNum intFiles map[string]int // 0: unFinished. 1:allocated. 2:completed.MapRime map[string]time.Time//reducereduceFin []int // 0: unFinished. 1:allocated. 2:completed.reduceFinNum intReduceRime []time.Time
}func (c *Coordinator) AllocateMapTask(args *MapArgs, reply *MapReply) error {reply.FileName, reply.TaskNum = c.allocateMap()reply.NReduce = c.nReducereturn nil
}func (c *Coordinator) allocateMap() (string, int) {c.mu.Lock()defer c.mu.Unlock()if c.mapFinNum == len(c.Files) {return "", -1}for filename, allocated := range c.Files {if allocated == 0 {c.Files[filename] = 1c.MapRime[filename] = time.Now()return filename, c.MapTaskNums[filename]}if allocated == 1 && !c.MapRime[filename].IsZero() && time.Since(c.MapRime[filename]) > 10*time.Second {c.MapRime[filename] = time.Now()return filename, c.MapTaskNums[filename]}}return "", -2
}func (c *Coordinator) FinishedMap(args *MapFinArgs, reply *MapFinReply) error {c.mu.Lock()defer c.mu.Unlock()if args.FileName != "" {if c.Files[args.FileName] != 2 {c.Files[args.FileName] = 2c.mapFinNum++}if c.mapFinNum == len(c.Files) {reply.AllFinished = true}}return nil
}func (c *Coordinator) AllocateReduceTask(args *ReduceArgs, reply *ReduceReply) error {reply.FileNum = len(c.Files)reply.ReduceNum = c.allocateReduce()return nil
}func (c *Coordinator) allocateReduce() int {c.mu.Lock()defer c.mu.Unlock()// 如果 map 阶段还没完成,则不分配 reduce 任务if c.mapFinNum != len(c.Files) {return -1}if c.reduceFinNum == c.nReduce {return -2}for idx, allocated := range c.reduceFin {if allocated == 0 {c.reduceFin[idx] = 1c.ReduceRime[idx] = time.Now()return idx}if allocated == 1 && !c.ReduceRime[idx].IsZero() && time.Since(c.ReduceRime[idx]) > 10*time.Second {c.ReduceRime[idx] = time.Now()return idx}}return -1
}func (c *Coordinator) FinishedReduce(args *ReduceFinArgs, reply *ReduceFinReply) error {c.mu.Lock()defer c.mu.Unlock()if c.reduceFin[args.TaskNum] != 2 {c.reduceFin[args.TaskNum] = 2c.reduceFinNum++}if c.reduceFinNum == c.nReduce {reply.AllFinished = true}return nil
}// start a thread that listens for RPCs from worker.go
func (c *Coordinator) server() {rpc.Register(c)rpc.HandleHTTP()//l, e := net.Listen("tcp", ":1234")sockname := coordinatorSock()os.Remove(sockname)l, e := net.Listen("unix", sockname)if e != nil {log.Fatal("listen error:", e)}go http.Serve(l, nil)
}// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
func (c *Coordinator) Done() bool {c.mu.Lock()defer c.mu.Unlock()ret := falseif c.reduceFinNum == c.nReduce {time.Sleep(5 * time.Second)ret = true}// Your code here.return ret
}// create a Coordinator.
// main/mrcoordinator.go calls this function.
// nReduce is the number of reduce tasks to use.
func MakeCoordinator(files []string, nReduce int) *Coordinator {c := Coordinator{}c.nReduce = nReducec.Files = make(map[string]int)c.MapTaskNums = make(map[string]int)// Your code here.for idx, filename := range files {c.Files[filename] = 0c.MapTaskNums[filename] = idx}c.reduceFin = make([]int, nReduce)c.MapRime = make(map[string]time.Time, len(c.Files))c.ReduceRime = make([]time.Time, nReduce)c.server()return &c
}
Worker
工作节点,执行Map、Reduce任务的,现在看来没有什么,最初没有考虑到退出的事情,没有进行无限循环(也是因为对go不熟,并且没有这方面的经验)。其他的还好,就是最初写的Call4Map最后被忽视,一直错误,并且一度觉得自己逻辑正确,怎么就不对呢?
package mrimport ("encoding/json""fmt""hash/fnv""io/ioutil""log""net/rpc""os""sort""time"
)// Map functions return a slice of KeyValue.
type KeyValue struct {Key stringValue string
}// for sorting by key.
type ByKey []KeyValue// for sorting by key.
func (a ByKey) Len() int { return len(a) }
func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
func ihash(key string) int {h := fnv.New32a()h.Write([]byte(key))return int(h.Sum32() & 0x7fffffff)
}func readfile(filename string) string {file, err := os.Open(filename)if err != nil {log.Fatalf("cannot open %v", filename)}content, err := ioutil.ReadAll(file)if err != nil {log.Fatalf("cannot read %v", filename)}file.Close()return string(content)
}// main/mrworker.go calls this function.
func Worker(mapf func(string, string) []KeyValue,reducef func(string, []string) string) {// Your worker implementation here.// uncomment to send the Example RPC to the coordinator.// CallExample()for {filename, num4Task, nReduce := Call4Map()if num4Task == -1 {break}if filename == "" {time.Sleep(time.Second)continue}// read file contentcontent := readfile(filename)// map file and save to filekva := mapf(filename, content)ofiles := make([]*os.File, nReduce)encoders := make([]*json.Encoder, nReduce)for i := 0; i < nReduce; i++ {oname := fmt.Sprintf("mr-%d-%d", num4Task, i)ofiles[i], _ = os.Create(oname)encoders[i] = json.NewEncoder(ofiles[i])}for _, kv := range kva {reduceIdx := ihash(kv.Key) % nReduceif err := encoders[reduceIdx].Encode(&kv); err != nil {fmt.Println("map write to file error!")}}for i := 0; i < nReduce; i++ {ofiles[i].Close()}// completed map task, tell coordinatorargs := MapFinArgs{filename}reply := MapFinReply{}call("Coordinator.FinishedMap", &args, &reply)}for {fileNum, reduceNum := Call4Reduce()if reduceNum == -2 {break}if reduceNum == -1 {time.Sleep(time.Second)continue}kva := []KeyValue{}for i := 0; i < fileNum; i++ {taskFile := fmt.Sprintf("mr-%d-%d", i, reduceNum)file, err := os.Open(taskFile)if err != nil {continue}dec := json.NewDecoder(file)for {var kv KeyValueif err := dec.Decode(&kv); err != nil {break}kva = append(kva, kv)}file.Close()}sort.Sort(ByKey(kva))oname := fmt.Sprintf("mr-out-%d", reduceNum)ofile, _ := os.Create(oname)i := 0for i < len(kva) {j := i + 1for j < len(kva) && kva[j].Key == kva[i].Key {j++}values := []string{}for k := i; k < j; k++ {values = append(values, kva[k].Value)}output := reducef(kva[i].Key, values)fmt.Fprintf(ofile, "%v %v\n", kva[i].Key, output)i = j}ofile.Close()args := ReduceFinArgs{reduceNum}reply := ReduceFinReply{}call("Coordinator.FinishedReduce", &args, &reply)}
}func Call4Map() (string, int, int) {args := MapArgs{0}reply := MapReply{}ok := call("Coordinator.AllocateMapTask", &args, &reply)if !ok {return "", -1, 0 //直接返回,退出map阶段}return reply.FileName, reply.TaskNum, reply.NReduce
}func Call4Reduce() (int, int) {args := ReduceArgs{}reply := ReduceReply{}ok := call("Coordinator.AllocateReduceTask", &args, &reply)if !ok {return 0, -2 //直接返回,退出reduce阶段}return reply.FileNum, reply.ReduceNum
}// send an RPC request to the coordinator, wait for the response.
// usually returns true.
// returns false if something goes wrong.
func call(rpcname string, args interface{}, reply interface{}) bool {// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")sockname := coordinatorSock()c, err := rpc.DialHTTP("unix", sockname)if err != nil {log.Fatal("dialing:", err)}defer c.Close()err = c.Call(rpcname, args, reply)if err == nil {return true}fmt.Println(err)return false
}
RPC
一个传输过程的传输必要数据的东西,首字母一定要大写,还是有多余的部分,懒得删了,也不知道怎么设计更好,但是必须带个参数,最后带一个AI设计的版本,更好、更优雅。但是还需要动其他的代码。设计这部分还是要好好学习,我的代码真是丑爆了。
package mr//
// RPC definitions.
//
// remember to capitalize all names.
//import ("os""strconv"
)// Add your RPC definitions here.
type MapArgs struct {N int
}type MapReply struct {FileName stringTaskNum int // for filename, -1:结束, -2:忙碌等待,其他:任务号NReduce int
}type MapFinArgs struct {FileName string
}type MapFinReply struct {AllFinished bool
}type ReduceArgs struct {FileName string
}type ReduceReply struct {FileNum int// -1:map未结束,等待, -2:结束,其他:任务号ReduceNum int // among of nReduce
}type ReduceFinArgs struct {TaskNum int
}type ReduceFinReply struct {AllFinished bool
}// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the coordinator.
// Can't use the current directory since
// Athena AFS doesn't support UNIX-domain sockets.
func coordinatorSock() string {s := "/var/tmp/5840-mr-"s += strconv.Itoa(os.Getuid())return s
}
AI
Workerfunc Worker(mapf func(string, string) []KeyValue,reducef func(string, []string) string) {// ========== Map 阶段 ==========for {reply := requestMapTask()switch reply.Status {case TaskStatusAllDone:// Map 阶段全部完成breakcase TaskStatusWait:// 暂时没有任务,等待time.Sleep(time.Second)continuecase TaskStatusReady:// 执行 Map 任务executeMapTask(mapf, reply)}if reply.Status == TaskStatusAllDone {break}}// ========== Reduce 阶段 ==========for {reply := requestReduceTask()switch reply.Status {case TaskStatusAllDone:// Reduce 阶段全部完成,Worker 退出returncase TaskStatusWait:// 暂时没有任务,等待time.Sleep(time.Second)continuecase TaskStatusReady:// 执行 Reduce 任务executeReduceTask(reducef, reply)}}
}func requestMapTask() AllocateMapTaskReply {args := AllocateMapTaskArgs{}reply := AllocateMapTaskReply{}ok := call("Coordinator.AllocateMapTask", &args, &reply)if !ok {// RPC 失败,Coordinator 可能已退出reply.Status = TaskStatusAllDone}return reply
}func executeMapTask(mapf func(string, string) []KeyValue, task AllocateMapTaskReply) {// 读取文件content := readfile(task.FileName)kva := mapf(task.FileName, content)// 写入中间文件writeIntermediateFiles(kva, task.TaskNum, task.NReduce)// 报告完成args := ReportMapTaskArgs{FileName: task.FileName,TaskNum: task.TaskNum,Version: task.Version,}reply := ReportMapTaskReply{}call("Coordinator.ReportMapTask", &args, &reply)
}func requestReduceTask() AllocateReduceTaskReply {args := AllocateReduceTaskArgs{}reply := AllocateReduceTaskReply{}ok := call("Coordinator.AllocateReduceTask", &args, &reply)if !ok {// RPC 失败reply.Status = TaskStatusAllDone}return reply
}func executeReduceTask(reducef func(string, []string) string, task AllocateReduceTaskReply) {// 读取中间文件kva := readIntermediateFiles(task.ReduceNum, task.NMapTasks)// 排序sort.Sort(ByKey(kva))// 写入输出文件writeOutputFile(reducef, kva, task.ReduceNum)// 报告完成args := ReportReduceTaskArgs{ReduceNum: task.ReduceNum,Version: task.Version,}reply := ReportReduceTaskReply{}call("Coordinator.ReportReduceTask", &args, &reply)
}
coordinator
type Coordinator struct {mu sync.Mutex// Map 任务相关nMapTasks intmapTasks map[string]*TaskInfo // filename -> task infomapFinished int// Reduce 任务相关nReduce intreduceTasks []*TaskInforeduceFinished int
}type TaskInfo struct {status int // 0: idle, 1: running, 2: finishedversion int // 任务版本号startTime time.Time // 任务开始时间
}func (c *Coordinator) AllocateMapTask(args *AllocateMapTaskArgs, reply *AllocateMapTaskReply) error {c.mu.Lock()defer c.mu.Unlock()// 所有任务已完成if c.mapFinished == c.nMapTasks {reply.Status = TaskStatusAllDonereturn nil}// 寻找可分配的任务for filename, task := range c.mapTasks {if task.status == 0 || (task.status == 1 && time.Since(task.startTime) > 10*time.Second) {// 分配任务task.status = 1task.version++task.startTime = time.Now()reply.Status = TaskStatusReadyreply.FileName = filenamereply.TaskNum = getTaskNum(filename) // 你的 MapTaskNumsreply.NReduce = c.nReducereply.Version = task.versionreturn nil}}// 所有任务都在执行中reply.Status = TaskStatusWaitreturn nil
}func (c *Coordinator) ReportMapTask(args *ReportMapTaskArgs, reply *ReportMapTaskReply) error {c.mu.Lock()defer c.mu.Unlock()task := c.mapTasks[args.FileName]// 检查版本号,只接受最新版本if task.version == args.Version && task.status != 2 {task.status = 2c.mapFinished++reply.Accept = true} else {reply.Accept = false}return nil
}// Reduce 任务类似...
rpc
package mr// 任务状态枚举
type TaskStatus intconst (TaskStatusReady TaskStatus = 0 // 任务可执行TaskStatusWait TaskStatus = 1 // 暂时无任务,等待TaskStatusAllDone TaskStatus = 2 // 所有任务完成
)// ========== Map Task RPC ==========type AllocateMapTaskArgs struct {// Worker 请求任务时不需要参数
}type AllocateMapTaskReply struct {Status TaskStatus // 任务状态FileName string // 要处理的文件名TaskNum int // Map 任务编号NReduce int // Reduce 任务总数Version int // 任务版本号(防止重复执行)
}type ReportMapTaskArgs struct {FileName string // 完成的文件名TaskNum int // 任务编号Version int // 任务版本号
}type ReportMapTaskReply struct {Accept bool // Coordinator 是否接受这次完成报告
}// ========== Reduce Task RPC ==========type AllocateReduceTaskArgs struct {// Worker 请求任务时不需要参数
}type AllocateReduceTaskReply struct {Status TaskStatus // 任务状态ReduceNum int // Reduce 任务编号NMapTasks int // Map 任务总数(用于读取中间文件)Version int // 任务版本号
}type ReportReduceTaskArgs struct {ReduceNum int // 完成的 Reduce 任务编号Version int // 任务版本号
}type ReportReduceTaskReply struct {Accept bool // Coordinator 是否接受这次完成报告
}// ========== Example RPC (可删除) ==========type ExampleArgs struct {X int
}type ExampleReply struct {Y int
}