当前位置: 首页 > news >正文

MIT6.824-MapReduce

Lab1: MapReduce(个人纪录版)

(烂完了,深刻感觉到自己有多菜,各个方面)
最初的版本是全部自己写的,测试的时候会出现时而成功,时而失败的情况,重写两遍的我已经不想再看两遍代码了,后面让AI帮忙分析,然后一点点的修改,最后出问题的地方是在Call4Map(),忘记更新那个状态了。

早些时候做的设计重复且没有用(除了对MapReduce的理解问题,还有就是刚接触的时候感觉逻辑复杂,一度不想写,断断续续写了好久,同时对go也不是很熟悉),后面改了好几次才有git上第一次提交的版本,第一次编译的时候错误很多,改了很久。

总体来说,确实没那么复杂,懂了才敢这么说。下面就不对代码做过多分析了, 仅仅说一下自己的思路,方便后面的复习。

关于怎么通信的没有关注,似乎是http.Serve吧,server`函数很重要,后面看看。

Coorderinator

作为一个协调器,需要考虑到维护一些状态,主要就是下面几个功能:

  • 分配Map任务
  • 对已经完成的Map做标记
  • 分配Reduce任务
  • 对已经完成的Reduce做标记

剩下的都是完成的事情。其中需要注意的是并发的情况,共享的数据是不能同时访问的,go的锁机制非常舒服,几乎不用怎么考虑锁。

下面这几个变量很重要,注释有说明。这里最初的设计非常失败,这里都是后面修改的,不然看不了。

	nReduce int	//reduce的任务数量,最终的reduce至少要分配几次//mapMapTaskNums map[string]int //每一个文件都只能对应一个TaskNummapFinNum   int	//已经完成的Map任务数量Files       map[string]int // 0: unFinished. 1:allocated. 2:completed.每个任务的状态MapRime     map[string]time.Time //任务开始的时间,超时重新分配//reducereduceFin    []int // 0: unFinished. 1:allocated. 2:completed.每个任务的状态reduceFinNum int //已经完成的Reduce任务数量ReduceRime   []time.Time //任务开始的时间,超时重新分配

下面是完整代码,关于时间的我直接AI辅助了,之前没有接触过,AI写例子,我来抄

package mrimport ("log""net""net/http""net/rpc""os""sync""time"
)type Coordinator struct {// Your definitions here.mu      sync.MutexnReduce int//mapMapTaskNums map[string]int // for fileNamemapFinNum   intFiles       map[string]int // 0: unFinished. 1:allocated. 2:completed.MapRime     map[string]time.Time//reducereduceFin    []int // 0: unFinished. 1:allocated. 2:completed.reduceFinNum intReduceRime   []time.Time
}func (c *Coordinator) AllocateMapTask(args *MapArgs, reply *MapReply) error {reply.FileName, reply.TaskNum = c.allocateMap()reply.NReduce = c.nReducereturn nil
}func (c *Coordinator) allocateMap() (string, int) {c.mu.Lock()defer c.mu.Unlock()if c.mapFinNum == len(c.Files) {return "", -1}for filename, allocated := range c.Files {if allocated == 0 {c.Files[filename] = 1c.MapRime[filename] = time.Now()return filename, c.MapTaskNums[filename]}if allocated == 1 && !c.MapRime[filename].IsZero() && time.Since(c.MapRime[filename]) > 10*time.Second {c.MapRime[filename] = time.Now()return filename, c.MapTaskNums[filename]}}return "", -2
}func (c *Coordinator) FinishedMap(args *MapFinArgs, reply *MapFinReply) error {c.mu.Lock()defer c.mu.Unlock()if args.FileName != "" {if c.Files[args.FileName] != 2 {c.Files[args.FileName] = 2c.mapFinNum++}if c.mapFinNum == len(c.Files) {reply.AllFinished = true}}return nil
}func (c *Coordinator) AllocateReduceTask(args *ReduceArgs, reply *ReduceReply) error {reply.FileNum = len(c.Files)reply.ReduceNum = c.allocateReduce()return nil
}func (c *Coordinator) allocateReduce() int {c.mu.Lock()defer c.mu.Unlock()// 如果 map 阶段还没完成,则不分配 reduce 任务if c.mapFinNum != len(c.Files) {return -1}if c.reduceFinNum == c.nReduce {return -2}for idx, allocated := range c.reduceFin {if allocated == 0 {c.reduceFin[idx] = 1c.ReduceRime[idx] = time.Now()return idx}if allocated == 1 && !c.ReduceRime[idx].IsZero() && time.Since(c.ReduceRime[idx]) > 10*time.Second {c.ReduceRime[idx] = time.Now()return idx}}return -1
}func (c *Coordinator) FinishedReduce(args *ReduceFinArgs, reply *ReduceFinReply) error {c.mu.Lock()defer c.mu.Unlock()if c.reduceFin[args.TaskNum] != 2 {c.reduceFin[args.TaskNum] = 2c.reduceFinNum++}if c.reduceFinNum == c.nReduce {reply.AllFinished = true}return nil
}// start a thread that listens for RPCs from worker.go
func (c *Coordinator) server() {rpc.Register(c)rpc.HandleHTTP()//l, e := net.Listen("tcp", ":1234")sockname := coordinatorSock()os.Remove(sockname)l, e := net.Listen("unix", sockname)if e != nil {log.Fatal("listen error:", e)}go http.Serve(l, nil)
}// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
func (c *Coordinator) Done() bool {c.mu.Lock()defer c.mu.Unlock()ret := falseif c.reduceFinNum == c.nReduce {time.Sleep(5 * time.Second)ret = true}// Your code here.return ret
}// create a Coordinator.
// main/mrcoordinator.go calls this function.
// nReduce is the number of reduce tasks to use.
func MakeCoordinator(files []string, nReduce int) *Coordinator {c := Coordinator{}c.nReduce = nReducec.Files = make(map[string]int)c.MapTaskNums = make(map[string]int)// Your code here.for idx, filename := range files {c.Files[filename] = 0c.MapTaskNums[filename] = idx}c.reduceFin = make([]int, nReduce)c.MapRime = make(map[string]time.Time, len(c.Files))c.ReduceRime = make([]time.Time, nReduce)c.server()return &c
}

Worker

工作节点,执行Map、Reduce任务的,现在看来没有什么,最初没有考虑到退出的事情,没有进行无限循环(也是因为对go不熟,并且没有这方面的经验)。其他的还好,就是最初写的Call4Map最后被忽视,一直错误,并且一度觉得自己逻辑正确,怎么就不对呢?

package mrimport ("encoding/json""fmt""hash/fnv""io/ioutil""log""net/rpc""os""sort""time"
)// Map functions return a slice of KeyValue.
type KeyValue struct {Key   stringValue string
}// for sorting by key.
type ByKey []KeyValue// for sorting by key.
func (a ByKey) Len() int           { return len(a) }
func (a ByKey) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
func ihash(key string) int {h := fnv.New32a()h.Write([]byte(key))return int(h.Sum32() & 0x7fffffff)
}func readfile(filename string) string {file, err := os.Open(filename)if err != nil {log.Fatalf("cannot open %v", filename)}content, err := ioutil.ReadAll(file)if err != nil {log.Fatalf("cannot read %v", filename)}file.Close()return string(content)
}// main/mrworker.go calls this function.
func Worker(mapf func(string, string) []KeyValue,reducef func(string, []string) string) {// Your worker implementation here.// uncomment to send the Example RPC to the coordinator.// CallExample()for {filename, num4Task, nReduce := Call4Map()if num4Task == -1 {break}if filename == "" {time.Sleep(time.Second)continue}// read file contentcontent := readfile(filename)// map file and save to filekva := mapf(filename, content)ofiles := make([]*os.File, nReduce)encoders := make([]*json.Encoder, nReduce)for i := 0; i < nReduce; i++ {oname := fmt.Sprintf("mr-%d-%d", num4Task, i)ofiles[i], _ = os.Create(oname)encoders[i] = json.NewEncoder(ofiles[i])}for _, kv := range kva {reduceIdx := ihash(kv.Key) % nReduceif err := encoders[reduceIdx].Encode(&kv); err != nil {fmt.Println("map write to file error!")}}for i := 0; i < nReduce; i++ {ofiles[i].Close()}// completed map task, tell coordinatorargs := MapFinArgs{filename}reply := MapFinReply{}call("Coordinator.FinishedMap", &args, &reply)}for {fileNum, reduceNum := Call4Reduce()if reduceNum == -2 {break}if reduceNum == -1 {time.Sleep(time.Second)continue}kva := []KeyValue{}for i := 0; i < fileNum; i++ {taskFile := fmt.Sprintf("mr-%d-%d", i, reduceNum)file, err := os.Open(taskFile)if err != nil {continue}dec := json.NewDecoder(file)for {var kv KeyValueif err := dec.Decode(&kv); err != nil {break}kva = append(kva, kv)}file.Close()}sort.Sort(ByKey(kva))oname := fmt.Sprintf("mr-out-%d", reduceNum)ofile, _ := os.Create(oname)i := 0for i < len(kva) {j := i + 1for j < len(kva) && kva[j].Key == kva[i].Key {j++}values := []string{}for k := i; k < j; k++ {values = append(values, kva[k].Value)}output := reducef(kva[i].Key, values)fmt.Fprintf(ofile, "%v %v\n", kva[i].Key, output)i = j}ofile.Close()args := ReduceFinArgs{reduceNum}reply := ReduceFinReply{}call("Coordinator.FinishedReduce", &args, &reply)}
}func Call4Map() (string, int, int) {args := MapArgs{0}reply := MapReply{}ok := call("Coordinator.AllocateMapTask", &args, &reply)if !ok {return "", -1, 0 //直接返回,退出map阶段}return reply.FileName, reply.TaskNum, reply.NReduce
}func Call4Reduce() (int, int) {args := ReduceArgs{}reply := ReduceReply{}ok := call("Coordinator.AllocateReduceTask", &args, &reply)if !ok {return 0, -2 //直接返回,退出reduce阶段}return reply.FileNum, reply.ReduceNum
}// send an RPC request to the coordinator, wait for the response.
// usually returns true.
// returns false if something goes wrong.
func call(rpcname string, args interface{}, reply interface{}) bool {// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")sockname := coordinatorSock()c, err := rpc.DialHTTP("unix", sockname)if err != nil {log.Fatal("dialing:", err)}defer c.Close()err = c.Call(rpcname, args, reply)if err == nil {return true}fmt.Println(err)return false
}

RPC

一个传输过程的传输必要数据的东西,首字母一定要大写,还是有多余的部分,懒得删了,也不知道怎么设计更好,但是必须带个参数,最后带一个AI设计的版本,更好、更优雅。但是还需要动其他的代码。设计这部分还是要好好学习,我的代码真是丑爆了。

package mr//
// RPC definitions.
//
// remember to capitalize all names.
//import ("os""strconv"
)// Add your RPC definitions here.
type MapArgs struct {N int
}type MapReply struct {FileName stringTaskNum  int // for filename, -1:结束, -2:忙碌等待,其他:任务号NReduce  int
}type MapFinArgs struct {FileName string
}type MapFinReply struct {AllFinished bool
}type ReduceArgs struct {FileName string
}type ReduceReply struct {FileNum int// -1:map未结束,等待, -2:结束,其他:任务号ReduceNum int // among of nReduce
}type ReduceFinArgs struct {TaskNum int
}type ReduceFinReply struct {AllFinished bool
}// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the coordinator.
// Can't use the current directory since
// Athena AFS doesn't support UNIX-domain sockets.
func coordinatorSock() string {s := "/var/tmp/5840-mr-"s += strconv.Itoa(os.Getuid())return s
}

AI

Workerfunc Worker(mapf func(string, string) []KeyValue,reducef func(string, []string) string) {// ========== Map 阶段 ==========for {reply := requestMapTask()switch reply.Status {case TaskStatusAllDone:// Map 阶段全部完成breakcase TaskStatusWait:// 暂时没有任务,等待time.Sleep(time.Second)continuecase TaskStatusReady:// 执行 Map 任务executeMapTask(mapf, reply)}if reply.Status == TaskStatusAllDone {break}}// ========== Reduce 阶段 ==========for {reply := requestReduceTask()switch reply.Status {case TaskStatusAllDone:// Reduce 阶段全部完成,Worker 退出returncase TaskStatusWait:// 暂时没有任务,等待time.Sleep(time.Second)continuecase TaskStatusReady:// 执行 Reduce 任务executeReduceTask(reducef, reply)}}
}func requestMapTask() AllocateMapTaskReply {args := AllocateMapTaskArgs{}reply := AllocateMapTaskReply{}ok := call("Coordinator.AllocateMapTask", &args, &reply)if !ok {// RPC 失败,Coordinator 可能已退出reply.Status = TaskStatusAllDone}return reply
}func executeMapTask(mapf func(string, string) []KeyValue, task AllocateMapTaskReply) {// 读取文件content := readfile(task.FileName)kva := mapf(task.FileName, content)// 写入中间文件writeIntermediateFiles(kva, task.TaskNum, task.NReduce)// 报告完成args := ReportMapTaskArgs{FileName: task.FileName,TaskNum:  task.TaskNum,Version:  task.Version,}reply := ReportMapTaskReply{}call("Coordinator.ReportMapTask", &args, &reply)
}func requestReduceTask() AllocateReduceTaskReply {args := AllocateReduceTaskArgs{}reply := AllocateReduceTaskReply{}ok := call("Coordinator.AllocateReduceTask", &args, &reply)if !ok {// RPC 失败reply.Status = TaskStatusAllDone}return reply
}func executeReduceTask(reducef func(string, []string) string, task AllocateReduceTaskReply) {// 读取中间文件kva := readIntermediateFiles(task.ReduceNum, task.NMapTasks)// 排序sort.Sort(ByKey(kva))// 写入输出文件writeOutputFile(reducef, kva, task.ReduceNum)// 报告完成args := ReportReduceTaskArgs{ReduceNum: task.ReduceNum,Version:   task.Version,}reply := ReportReduceTaskReply{}call("Coordinator.ReportReduceTask", &args, &reply)
}
coordinator
type Coordinator struct {mu sync.Mutex// Map 任务相关nMapTasks      intmapTasks       map[string]*TaskInfo  // filename -> task infomapFinished    int// Reduce 任务相关nReduce        intreduceTasks    []*TaskInforeduceFinished int
}type TaskInfo struct {status    int       // 0: idle, 1: running, 2: finishedversion   int       // 任务版本号startTime time.Time // 任务开始时间
}func (c *Coordinator) AllocateMapTask(args *AllocateMapTaskArgs, reply *AllocateMapTaskReply) error {c.mu.Lock()defer c.mu.Unlock()// 所有任务已完成if c.mapFinished == c.nMapTasks {reply.Status = TaskStatusAllDonereturn nil}// 寻找可分配的任务for filename, task := range c.mapTasks {if task.status == 0 || (task.status == 1 && time.Since(task.startTime) > 10*time.Second) {// 分配任务task.status = 1task.version++task.startTime = time.Now()reply.Status = TaskStatusReadyreply.FileName = filenamereply.TaskNum = getTaskNum(filename) // 你的 MapTaskNumsreply.NReduce = c.nReducereply.Version = task.versionreturn nil}}// 所有任务都在执行中reply.Status = TaskStatusWaitreturn nil
}func (c *Coordinator) ReportMapTask(args *ReportMapTaskArgs, reply *ReportMapTaskReply) error {c.mu.Lock()defer c.mu.Unlock()task := c.mapTasks[args.FileName]// 检查版本号,只接受最新版本if task.version == args.Version && task.status != 2 {task.status = 2c.mapFinished++reply.Accept = true} else {reply.Accept = false}return nil
}// Reduce 任务类似...
rpc
package mr// 任务状态枚举
type TaskStatus intconst (TaskStatusReady      TaskStatus = 0 // 任务可执行TaskStatusWait       TaskStatus = 1 // 暂时无任务,等待TaskStatusAllDone    TaskStatus = 2 // 所有任务完成
)// ========== Map Task RPC ==========type AllocateMapTaskArgs struct {// Worker 请求任务时不需要参数
}type AllocateMapTaskReply struct {Status   TaskStatus // 任务状态FileName string     // 要处理的文件名TaskNum  int        // Map 任务编号NReduce  int        // Reduce 任务总数Version  int        // 任务版本号(防止重复执行)
}type ReportMapTaskArgs struct {FileName string // 完成的文件名TaskNum  int    // 任务编号Version  int    // 任务版本号
}type ReportMapTaskReply struct {Accept bool // Coordinator 是否接受这次完成报告
}// ========== Reduce Task RPC ==========type AllocateReduceTaskArgs struct {// Worker 请求任务时不需要参数
}type AllocateReduceTaskReply struct {Status     TaskStatus // 任务状态ReduceNum  int        // Reduce 任务编号NMapTasks  int        // Map 任务总数(用于读取中间文件)Version    int        // 任务版本号
}type ReportReduceTaskArgs struct {ReduceNum int // 完成的 Reduce 任务编号Version   int // 任务版本号
}type ReportReduceTaskReply struct {Accept bool // Coordinator 是否接受这次完成报告
}// ========== Example RPC (可删除) ==========type ExampleArgs struct {X int
}type ExampleReply struct {Y int
}
http://www.hskmm.com/?act=detail&tid=36987

相关文章:

  • 直流电机编码器测速
  • 搜索百科(5):Easysearch — 自主可控的国产分布式搜索引擎
  • 20251022 之所思 - 人生如梦
  • AI 赋能 + 场景破界 低代码平台的未来发展趋势
  • 迎面走来的是邪恶构造题
  • 中小企业数字化转型难?低代码的轻量化破局方案
  • 低代码引发组织协同革命 重塑数字化转型新逻辑
  • 日志|力扣|98.验证二叉搜索树|前序遍历|中序遍历|后序遍历|
  • 高级语言程序设计第二次个人作业
  • 2025年公众号编辑器排版还很难?如何用公众号编辑器做出精美文章?
  • 1022
  • 幂是任意常数的二项式定理
  • PowerShell---的办公小技能
  • 10月22日日记
  • 编译folly
  • 2025年独家测评:哪些微信公众号文章排版工具更适合自媒体运营?
  • 20232413 2025-2026-1 《网络与系统攻防技术》实验二实验报告
  • Stable Diffusion下载安装教程(附安装包)快速部署 AI 绘画工具
  • WAV和PCM的联系与区别
  • 使用WebSocket API驱动家庭自动化
  • 20232428 2025-2026-1 《网络与系统攻防技术》实验二实验报告
  • 20234320 2025-2026-1 《网络与系统攻防技术》实验二实验报告
  • 20232317 2025-2026-1《网络与系统攻防技术》实验二实验报告
  • 第2天(简单题中等题 取余、因数与倍数、数组 矩阵、数组 字符串)
  • python笔记
  • 20232326 2025-2026-1 《网络与系统攻防技术》实验二实验报告
  • 20232412 2025-2026-1 《网络与系统攻防技术》实验二实验报告
  • 20232403 2025-2026-1 《网络与系统攻防技术》实验二实验报告
  • NOIP2024
  • 20232415 2025-2026-1 《网络与系统攻防技术》 实验二实验报告