From f469fc561f8ee66e1998b4f2b5d5ff8fb0d85a1d Mon Sep 17 00:00:00 2001 From: Paul Pan Date: Thu, 6 Oct 2022 09:57:17 +0800 Subject: [PATCH] lab1 --- .idea/.gitignore | 8 ++ .idea/6.824.iml | 9 ++ .idea/modules.xml | 8 ++ .idea/vcs.xml | 6 ++ src/go.mod | 2 +- src/main/test-mr-many.sh | 0 src/main/test-mr.sh | 0 src/mr/coordinator.go | 214 ++++++++++++++++++++++++++++++++++----- src/mr/rpc.go | 49 ++++++--- src/mr/worker.go | 180 +++++++++++++++++++++++++------- 10 files changed, 396 insertions(+), 80 deletions(-) create mode 100644 .idea/.gitignore create mode 100644 .idea/6.824.iml create mode 100644 .idea/modules.xml create mode 100644 .idea/vcs.xml mode change 100644 => 100755 src/main/test-mr-many.sh mode change 100644 => 100755 src/main/test-mr.sh diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/6.824.iml b/.idea/6.824.iml new file mode 100644 index 0000000..5e764c4 --- /dev/null +++ b/.idea/6.824.iml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..6d0f03a --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/src/go.mod b/src/go.mod index 20585e3..f554190 100644 --- a/src/go.mod +++ b/src/go.mod @@ -1,3 +1,3 @@ module 6.824 -go 1.15 +go 1.18 diff --git a/src/main/test-mr-many.sh b/src/main/test-mr-many.sh old mode 100644 new mode 100755 diff --git a/src/main/test-mr.sh b/src/main/test-mr.sh old mode 100644 new mode 100755 diff --git a/src/mr/coordinator.go b/src/mr/coordinator.go index cafda57..40f83ad 100644 --- a/src/mr/coordinator.go +++ b/src/mr/coordinator.go @@ -1,33 +1,181 @@ package mr -import "log" -import "net" -import "os" -import "net/rpc" -import "net/http" +import ( + "fmt" + "log" + "net" + "net/http" + "net/rpc" + "os" + "sync" + "time" +) +type TaskInfo struct { + Type TaskType + Id int + Wid int + Filename []string + StartTime time.Time + Regions int +} + +const ( + SchedulerTimeout = 10 * time.Microsecond + TaskTimeout = 5 * time.Second +) + +type Phase int + +const ( + PhaseMap = iota + PhaseReduce +) type Coordinator struct { // Your definitions here. - + regions int + nMaps int + done bool + tasks []TaskInfo + mu sync.Mutex + pending chan *TaskInfo + phase Phase + finished int } // Your code here -- RPC handlers for the worker to call. -// -// an example RPC handler. -// -// the RPC argument and reply types are defined in rpc.go. -// -func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error { - reply.Y = args.X + 1 +func (c *Coordinator) debug() { + t := time.NewTimer(5 * time.Second) + defer t.Stop() + for { + <-t.C + log.Println("======================================") + log.Printf("Phase: %v, Finished: %d, Done: %v\n", c.phase, c.finished, c.done) + for _, t := range c.tasks { + log.Printf("Task %d type %v id %d wid %d time %v\n", t.Id, t.Type, t.Id, t.Wid, t.StartTime) + } + log.Println("======================================") + t.Reset(5 * time.Second) + } +} + +func (c *Coordinator) scheduler() { + c.mu.Lock() + defer c.mu.Unlock() + + if c.phase == PhaseMap && c.nMaps == c.finished { + // log.Println("[C] Map phase finished") + + c.phase = PhaseReduce + c.finished = 0 + + filenames := make([][]string, c.regions) + for i := range c.tasks { + for j := 0; j < c.regions; j++ { + filenames[j] = append(filenames[j], fmt.Sprintf("mr-%d-%d-%d", + c.tasks[i].Wid, c.tasks[i].Id, j)) + } + } + + c.tasks = nil + for i := 0; i < c.regions; i++ { + c.tasks = append(c.tasks, TaskInfo{ + Type: TaskPendingReduce, + Id: i, + Wid: 0, + Filename: filenames[i], + Regions: c.regions, + }) + } + } + + scheduled := true + for i := range c.tasks { + if c.tasks[i].Type == TaskPendingMap { + // log.Println("[C] Map task", c.tasks[i].Id, "scheduled") + c.tasks[i].Type = TaskMapQueue + c.pending <- &c.tasks[i] + break + } else if c.tasks[i].Type == TaskMap { + if time.Now().Sub(c.tasks[i].StartTime) > TaskTimeout { + // log.Println("[C] Map task", c.tasks[i].Id, "timeout") + c.tasks[i].Type = TaskPendingMap + } + } else if c.tasks[i].Type == TaskPendingReduce { + // log.Println("[C] Reduce task", c.tasks[i].Id, "scheduled") + c.tasks[i].Type = TaskReduceQueue + c.pending <- &c.tasks[i] + break + } else if c.tasks[i].Type == TaskReduce { + if time.Now().Sub(c.tasks[i].StartTime) > TaskTimeout { + // log.Println("[C] Reduce task", c.tasks[i].Id, "timeout") + c.tasks[i].Type = TaskPendingReduce + } + } else { + scheduled = false + } + } + + if !scheduled && c.phase == PhaseReduce && c.regions == c.finished { + // log.Println("[C] Reduce phase finished") + close(c.pending) + c.done = true + } +} + +func (c *Coordinator) timer() { + t := time.NewTimer(SchedulerTimeout) + defer t.Stop() + for { + <-t.C + c.scheduler() + if c.done { + break + } + t.Reset(SchedulerTimeout) + } +} + +func (c *Coordinator) GetWork(args *TaskArg, reply *TaskReply) error { + task, ok := <-c.pending + if ok { + c.mu.Lock() + task.StartTime = time.Now() + task.Type += 1 + c.mu.Unlock() + + // log.Println("[C] Worker", args.Wid, "get task", task.Id, "type", task.Type) + reply.Task = *task + } else { + reply.Task = TaskInfo{Type: TaskSleep} + } return nil } +func (c *Coordinator) Report(args *ReportArgs, reply *Empty) error { + c.mu.Lock() + defer c.mu.Unlock() + + // log.Println("[C] Worker", args.Wid, "report task", args.Tid, "type", args.Type) + for i := range c.tasks { + if c.tasks[i].Id == args.Tid && c.tasks[i].Type == args.Type { + if c.tasks[i].Wid != 0 { + // already finished + // log.Println("[C] Task already finished", c.tasks[i].Wid) + break + } + c.finished++ + c.tasks[i].Type++ + c.tasks[i].Wid = args.Wid + } + } + + return nil +} -// // start a thread that listens for RPCs from worker.go -// func (c *Coordinator) server() { rpc.Register(c) rpc.HandleHTTP() @@ -39,31 +187,47 @@ func (c *Coordinator) server() { log.Fatal("listen error:", e) } go http.Serve(l, nil) + go c.timer() + // go c.debug() } -// // main/mrcoordinator.go calls Done() periodically to find out // if the entire job has finished. -// func (c *Coordinator) Done() bool { - ret := false + c.mu.Lock() + defer c.mu.Unlock() - // Your code here. - - - return ret + return c.done +} + +func max[T int](a, b T) T { + if a < b { + return b + } else { + return a + } } -// // create a Coordinator. // main/mrcoordinator.go calls this function. // nReduce is the number of reduce tasks to use. -// func MakeCoordinator(files []string, nReduce int) *Coordinator { c := Coordinator{} // Your code here. - + c.regions = nReduce + c.nMaps = len(files) + c.tasks = make([]TaskInfo, c.nMaps) + c.pending = make(chan *TaskInfo, max(c.nMaps, c.regions)) + for i := 0; i < c.nMaps; i++ { + c.tasks[i] = TaskInfo{ + Type: TaskPendingMap, + Id: i, + Wid: 0, + Filename: []string{files[i]}, + Regions: nReduce, + } + } c.server() return &c diff --git a/src/mr/rpc.go b/src/mr/rpc.go index abffa81..df190f8 100644 --- a/src/mr/rpc.go +++ b/src/mr/rpc.go @@ -6,24 +6,43 @@ package mr // remember to capitalize all names. // -import "os" -import "strconv" - -// -// example to show how to declare the arguments -// and reply for an RPC. -// - -type ExampleArgs struct { - X int -} - -type ExampleReply struct { - Y int -} +import ( + "os" + "strconv" +) // Add your RPC definitions here. +type TaskType int + +const ( + TaskPendingMap TaskType = iota + TaskMapQueue + TaskMap + TaskMapFinished + TaskPendingReduce + TaskReduceQueue + TaskReduce + TaskReduceFinished + TaskSleep +) + +type TaskArg struct { + Wid int +} + +type TaskReply struct { + Task TaskInfo +} + +type ReportArgs struct { + Tid int + Wid int + Type TaskType +} + +type Empty struct { +} // Cook up a unique-ish UNIX-domain socket name // in /var/tmp, for the coordinator. diff --git a/src/mr/worker.go b/src/mr/worker.go index aaa8b64..86e6955 100644 --- a/src/mr/worker.go +++ b/src/mr/worker.go @@ -1,77 +1,179 @@ package mr -import "fmt" -import "log" -import "net/rpc" -import "hash/fnv" +import ( + "encoding/json" + "fmt" + "hash/fnv" + "io" + "log" + "net/rpc" + "os" + "sort" +) - -// // Map functions return a slice of KeyValue. -// type KeyValue struct { Key string Value string } -// // use ihash(key) % NReduce to choose the reduce // task number for each KeyValue emitted by Map. -// func ihash(key string) int { h := fnv.New32a() h.Write([]byte(key)) return int(h.Sum32() & 0x7fffffff) } - -// // main/mrworker.go calls this function. -// func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) { // Your worker implementation here. + wid := os.Getpid() + // log.Println("Worker", wid, "start") - // uncomment to send the Example RPC to the coordinator. - // CallExample() + for { + work := GetWork(wid) -} + if work.Type == TaskMap { + if len(work.Filename) != 1 { + log.Fatalf("mismatched work, len = %v", len(work.Filename)) + } -// -// example function to show how to make an RPC call to the coordinator. -// -// the RPC argument and reply types are defined in rpc.go. -// -func CallExample() { + file, err := os.Open(work.Filename[0]) + if err != nil { + log.Fatalf("cannot open %v", work.Filename[0]) + } - // declare an argument structure. - args := ExampleArgs{} + content, err := io.ReadAll(file) + if err != nil { + log.Fatalf("cannot read %v", work.Filename[0]) + } - // fill in the argument(s). - args.X = 99 + _ = file.Close() - // declare a reply structure. - reply := ExampleReply{} + kva := mapf(work.Filename[0], string(content)) + intermediate := make(map[int][]KeyValue, work.Regions) - // send the RPC request, wait for the reply. - // the "Coordinator.Example" tells the - // receiving server that we'd like to call - // the Example() method of struct Coordinator. - ok := call("Coordinator.Example", &args, &reply) - if ok { - // reply.Y should be 100. - fmt.Printf("reply.Y %v\n", reply.Y) - } else { - fmt.Printf("call failed!\n") + for _, kv := range kva { + id := ihash(kv.Key) % work.Regions + intermediate[id] = append(intermediate[id], kv) + } + + for i := 0; i < work.Regions; i++ { + file, err := os.CreateTemp("./", "mr-worker-") + if err != nil { + log.Fatalf("cannot create temp file: %v", err.Error()) + } + + content := json.NewEncoder(file) + for _, kv := range intermediate[i] { + _ = content.Encode(&kv) + } + _ = file.Close() + + filename := fmt.Sprintf("./mr-%d-%d-%d", wid, work.Id, i) + err = os.Rename(file.Name(), filename) + if err != nil { + log.Fatalf("cannot rename %v to %v. err=%v", file.Name(), filename, err.Error()) + } + } + + Report(work.Id, wid, TaskMap) + + } else if work.Type == TaskReduce { + if len(work.Filename) == 0 { + log.Fatalln("empty reduce work") + } + + var intermediate []KeyValue + for _, filename := range work.Filename { + file, err := os.Open(filename) + if err != nil { + log.Fatalf("cannot open %v", filename) + } + + content := json.NewDecoder(file) + for { + var kv KeyValue + if err := content.Decode(&kv); err != nil { + break + } + intermediate = append(intermediate, kv) + } + _ = file.Close() + } + + sort.Slice(intermediate, func(i, j int) bool { + return intermediate[i].Key < intermediate[j].Key + }) + + file, err := os.CreateTemp("./", "mr-worker-") + if err != nil { + log.Fatalf("cannot create temp file: %v", err.Error()) + } + + i := 0 + for i < len(intermediate) { + j := i + 1 + for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key { + j++ + } + var values []string + for k := i; k < j; k++ { + values = append(values, intermediate[k].Value) + } + output := reducef(intermediate[i].Key, values) + + _, _ = fmt.Fprintf(file, "%v %v\n", intermediate[i].Key, output) + + i = j + } + _ = file.Close() + filename := fmt.Sprintf("./mr-out-%v", work.Id) + err = os.Rename(file.Name(), filename) + if err != nil { + log.Fatalf("cannot rename %v to %v. err=%v", file.Name(), filename, err.Error()) + } + + Report(work.Id, wid, TaskReduce) + + } else { + break + } } } -// +func Report(tid int, wid int, typ TaskType) { + args := &ReportArgs{Tid: tid, Wid: wid, Type: typ} + reply := &Empty{} + + // log.Println("Report", args) + + ok := call("Coordinator.Report", args, reply) + if !ok { + log.Println("call failed!") + } +} + +func GetWork(wid int) TaskInfo { + args := &TaskArg{Wid: wid} + reply := &TaskReply{} + + ok := call("Coordinator.GetWork", args, reply) + if !ok { + // log.Println("call failed!") + return TaskInfo{Type: TaskSleep} + } + + // log.Println("GetWork", reply.Task) + return reply.Task +} + // send an RPC request to the coordinator, wait for the response. // usually returns true. // returns false if something goes wrong. -// func call(rpcname string, args interface{}, reply interface{}) bool { // c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234") sockname := coordinatorSock()