This commit is contained in:
Paul Pan 2022-10-06 09:57:17 +08:00
parent d94ea16780
commit f469fc561f
10 changed files with 396 additions and 80 deletions

8
.idea/.gitignore vendored Normal file
View File

@ -0,0 +1,8 @@
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

9
.idea/6.824.iml Normal file
View File

@ -0,0 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="WEB_MODULE" version="4">
<component name="Go" enabled="true" />
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

8
.idea/modules.xml Normal file
View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/6.824.iml" filepath="$PROJECT_DIR$/.idea/6.824.iml" />
</modules>
</component>
</project>

6
.idea/vcs.xml Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

View File

@ -1,3 +1,3 @@
module 6.824 module 6.824
go 1.15 go 1.18

0
src/main/test-mr-many.sh Normal file → Executable file
View File

0
src/main/test-mr.sh Normal file → Executable file
View File

View File

@ -1,33 +1,181 @@
package mr package mr
import "log" import (
import "net" "fmt"
import "os" "log"
import "net/rpc" "net"
import "net/http" "net/http"
"net/rpc"
"os"
"sync"
"time"
)
type TaskInfo struct {
Type TaskType
Id int
Wid int
Filename []string
StartTime time.Time
Regions int
}
const (
SchedulerTimeout = 10 * time.Microsecond
TaskTimeout = 5 * time.Second
)
type Phase int
const (
PhaseMap = iota
PhaseReduce
)
type Coordinator struct { type Coordinator struct {
// Your definitions here. // Your definitions here.
regions int
nMaps int
done bool
tasks []TaskInfo
mu sync.Mutex
pending chan *TaskInfo
phase Phase
finished int
} }
// Your code here -- RPC handlers for the worker to call. // Your code here -- RPC handlers for the worker to call.
// func (c *Coordinator) debug() {
// an example RPC handler. t := time.NewTimer(5 * time.Second)
// defer t.Stop()
// the RPC argument and reply types are defined in rpc.go. for {
// <-t.C
func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error { log.Println("======================================")
reply.Y = args.X + 1 log.Printf("Phase: %v, Finished: %d, Done: %v\n", c.phase, c.finished, c.done)
for _, t := range c.tasks {
log.Printf("Task %d type %v id %d wid %d time %v\n", t.Id, t.Type, t.Id, t.Wid, t.StartTime)
}
log.Println("======================================")
t.Reset(5 * time.Second)
}
}
func (c *Coordinator) scheduler() {
c.mu.Lock()
defer c.mu.Unlock()
if c.phase == PhaseMap && c.nMaps == c.finished {
// log.Println("[C] Map phase finished")
c.phase = PhaseReduce
c.finished = 0
filenames := make([][]string, c.regions)
for i := range c.tasks {
for j := 0; j < c.regions; j++ {
filenames[j] = append(filenames[j], fmt.Sprintf("mr-%d-%d-%d",
c.tasks[i].Wid, c.tasks[i].Id, j))
}
}
c.tasks = nil
for i := 0; i < c.regions; i++ {
c.tasks = append(c.tasks, TaskInfo{
Type: TaskPendingReduce,
Id: i,
Wid: 0,
Filename: filenames[i],
Regions: c.regions,
})
}
}
scheduled := true
for i := range c.tasks {
if c.tasks[i].Type == TaskPendingMap {
// log.Println("[C] Map task", c.tasks[i].Id, "scheduled")
c.tasks[i].Type = TaskMapQueue
c.pending <- &c.tasks[i]
break
} else if c.tasks[i].Type == TaskMap {
if time.Now().Sub(c.tasks[i].StartTime) > TaskTimeout {
// log.Println("[C] Map task", c.tasks[i].Id, "timeout")
c.tasks[i].Type = TaskPendingMap
}
} else if c.tasks[i].Type == TaskPendingReduce {
// log.Println("[C] Reduce task", c.tasks[i].Id, "scheduled")
c.tasks[i].Type = TaskReduceQueue
c.pending <- &c.tasks[i]
break
} else if c.tasks[i].Type == TaskReduce {
if time.Now().Sub(c.tasks[i].StartTime) > TaskTimeout {
// log.Println("[C] Reduce task", c.tasks[i].Id, "timeout")
c.tasks[i].Type = TaskPendingReduce
}
} else {
scheduled = false
}
}
if !scheduled && c.phase == PhaseReduce && c.regions == c.finished {
// log.Println("[C] Reduce phase finished")
close(c.pending)
c.done = true
}
}
func (c *Coordinator) timer() {
t := time.NewTimer(SchedulerTimeout)
defer t.Stop()
for {
<-t.C
c.scheduler()
if c.done {
break
}
t.Reset(SchedulerTimeout)
}
}
func (c *Coordinator) GetWork(args *TaskArg, reply *TaskReply) error {
task, ok := <-c.pending
if ok {
c.mu.Lock()
task.StartTime = time.Now()
task.Type += 1
c.mu.Unlock()
// log.Println("[C] Worker", args.Wid, "get task", task.Id, "type", task.Type)
reply.Task = *task
} else {
reply.Task = TaskInfo{Type: TaskSleep}
}
return nil return nil
} }
func (c *Coordinator) Report(args *ReportArgs, reply *Empty) error {
c.mu.Lock()
defer c.mu.Unlock()
// log.Println("[C] Worker", args.Wid, "report task", args.Tid, "type", args.Type)
for i := range c.tasks {
if c.tasks[i].Id == args.Tid && c.tasks[i].Type == args.Type {
if c.tasks[i].Wid != 0 {
// already finished
// log.Println("[C] Task already finished", c.tasks[i].Wid)
break
}
c.finished++
c.tasks[i].Type++
c.tasks[i].Wid = args.Wid
}
}
return nil
}
//
// start a thread that listens for RPCs from worker.go // start a thread that listens for RPCs from worker.go
//
func (c *Coordinator) server() { func (c *Coordinator) server() {
rpc.Register(c) rpc.Register(c)
rpc.HandleHTTP() rpc.HandleHTTP()
@ -39,31 +187,47 @@ func (c *Coordinator) server() {
log.Fatal("listen error:", e) log.Fatal("listen error:", e)
} }
go http.Serve(l, nil) go http.Serve(l, nil)
go c.timer()
// go c.debug()
} }
//
// main/mrcoordinator.go calls Done() periodically to find out // main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished. // if the entire job has finished.
//
func (c *Coordinator) Done() bool { func (c *Coordinator) Done() bool {
ret := false c.mu.Lock()
defer c.mu.Unlock()
// Your code here. return c.done
}
return ret func max[T int](a, b T) T {
if a < b {
return b
} else {
return a
}
} }
//
// create a Coordinator. // create a Coordinator.
// main/mrcoordinator.go calls this function. // main/mrcoordinator.go calls this function.
// nReduce is the number of reduce tasks to use. // nReduce is the number of reduce tasks to use.
//
func MakeCoordinator(files []string, nReduce int) *Coordinator { func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{} c := Coordinator{}
// Your code here. // Your code here.
c.regions = nReduce
c.nMaps = len(files)
c.tasks = make([]TaskInfo, c.nMaps)
c.pending = make(chan *TaskInfo, max(c.nMaps, c.regions))
for i := 0; i < c.nMaps; i++ {
c.tasks[i] = TaskInfo{
Type: TaskPendingMap,
Id: i,
Wid: 0,
Filename: []string{files[i]},
Regions: nReduce,
}
}
c.server() c.server()
return &c return &c

View File

@ -6,24 +6,43 @@ package mr
// remember to capitalize all names. // remember to capitalize all names.
// //
import "os" import (
import "strconv" "os"
"strconv"
// )
// example to show how to declare the arguments
// and reply for an RPC.
//
type ExampleArgs struct {
X int
}
type ExampleReply struct {
Y int
}
// Add your RPC definitions here. // Add your RPC definitions here.
type TaskType int
const (
TaskPendingMap TaskType = iota
TaskMapQueue
TaskMap
TaskMapFinished
TaskPendingReduce
TaskReduceQueue
TaskReduce
TaskReduceFinished
TaskSleep
)
type TaskArg struct {
Wid int
}
type TaskReply struct {
Task TaskInfo
}
type ReportArgs struct {
Tid int
Wid int
Type TaskType
}
type Empty struct {
}
// Cook up a unique-ish UNIX-domain socket name // Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the coordinator. // in /var/tmp, for the coordinator.

View File

@ -1,77 +1,179 @@
package mr package mr
import "fmt" import (
import "log" "encoding/json"
import "net/rpc" "fmt"
import "hash/fnv" "hash/fnv"
"io"
"log"
"net/rpc"
"os"
"sort"
)
//
// Map functions return a slice of KeyValue. // Map functions return a slice of KeyValue.
//
type KeyValue struct { type KeyValue struct {
Key string Key string
Value string Value string
} }
//
// use ihash(key) % NReduce to choose the reduce // use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map. // task number for each KeyValue emitted by Map.
//
func ihash(key string) int { func ihash(key string) int {
h := fnv.New32a() h := fnv.New32a()
h.Write([]byte(key)) h.Write([]byte(key))
return int(h.Sum32() & 0x7fffffff) return int(h.Sum32() & 0x7fffffff)
} }
//
// main/mrworker.go calls this function. // main/mrworker.go calls this function.
//
func Worker(mapf func(string, string) []KeyValue, func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) { reducef func(string, []string) string) {
// Your worker implementation here. // Your worker implementation here.
wid := os.Getpid()
// log.Println("Worker", wid, "start")
// uncomment to send the Example RPC to the coordinator. for {
// CallExample() work := GetWork(wid)
} if work.Type == TaskMap {
if len(work.Filename) != 1 {
log.Fatalf("mismatched work, len = %v", len(work.Filename))
}
// file, err := os.Open(work.Filename[0])
// example function to show how to make an RPC call to the coordinator. if err != nil {
// log.Fatalf("cannot open %v", work.Filename[0])
// the RPC argument and reply types are defined in rpc.go. }
//
func CallExample() {
// declare an argument structure. content, err := io.ReadAll(file)
args := ExampleArgs{} if err != nil {
log.Fatalf("cannot read %v", work.Filename[0])
}
// fill in the argument(s). _ = file.Close()
args.X = 99
// declare a reply structure. kva := mapf(work.Filename[0], string(content))
reply := ExampleReply{} intermediate := make(map[int][]KeyValue, work.Regions)
// send the RPC request, wait for the reply. for _, kv := range kva {
// the "Coordinator.Example" tells the id := ihash(kv.Key) % work.Regions
// receiving server that we'd like to call intermediate[id] = append(intermediate[id], kv)
// the Example() method of struct Coordinator. }
ok := call("Coordinator.Example", &args, &reply)
if ok { for i := 0; i < work.Regions; i++ {
// reply.Y should be 100. file, err := os.CreateTemp("./", "mr-worker-")
fmt.Printf("reply.Y %v\n", reply.Y) if err != nil {
} else { log.Fatalf("cannot create temp file: %v", err.Error())
fmt.Printf("call failed!\n") }
content := json.NewEncoder(file)
for _, kv := range intermediate[i] {
_ = content.Encode(&kv)
}
_ = file.Close()
filename := fmt.Sprintf("./mr-%d-%d-%d", wid, work.Id, i)
err = os.Rename(file.Name(), filename)
if err != nil {
log.Fatalf("cannot rename %v to %v. err=%v", file.Name(), filename, err.Error())
}
}
Report(work.Id, wid, TaskMap)
} else if work.Type == TaskReduce {
if len(work.Filename) == 0 {
log.Fatalln("empty reduce work")
}
var intermediate []KeyValue
for _, filename := range work.Filename {
file, err := os.Open(filename)
if err != nil {
log.Fatalf("cannot open %v", filename)
}
content := json.NewDecoder(file)
for {
var kv KeyValue
if err := content.Decode(&kv); err != nil {
break
}
intermediate = append(intermediate, kv)
}
_ = file.Close()
}
sort.Slice(intermediate, func(i, j int) bool {
return intermediate[i].Key < intermediate[j].Key
})
file, err := os.CreateTemp("./", "mr-worker-")
if err != nil {
log.Fatalf("cannot create temp file: %v", err.Error())
}
i := 0
for i < len(intermediate) {
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
var values []string
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
output := reducef(intermediate[i].Key, values)
_, _ = fmt.Fprintf(file, "%v %v\n", intermediate[i].Key, output)
i = j
}
_ = file.Close()
filename := fmt.Sprintf("./mr-out-%v", work.Id)
err = os.Rename(file.Name(), filename)
if err != nil {
log.Fatalf("cannot rename %v to %v. err=%v", file.Name(), filename, err.Error())
}
Report(work.Id, wid, TaskReduce)
} else {
break
}
} }
} }
// func Report(tid int, wid int, typ TaskType) {
args := &ReportArgs{Tid: tid, Wid: wid, Type: typ}
reply := &Empty{}
// log.Println("Report", args)
ok := call("Coordinator.Report", args, reply)
if !ok {
log.Println("call failed!")
}
}
func GetWork(wid int) TaskInfo {
args := &TaskArg{Wid: wid}
reply := &TaskReply{}
ok := call("Coordinator.GetWork", args, reply)
if !ok {
// log.Println("call failed!")
return TaskInfo{Type: TaskSleep}
}
// log.Println("GetWork", reply.Task)
return reply.Task
}
// send an RPC request to the coordinator, wait for the response. // send an RPC request to the coordinator, wait for the response.
// usually returns true. // usually returns true.
// returns false if something goes wrong. // returns false if something goes wrong.
//
func call(rpcname string, args interface{}, reply interface{}) bool { func call(rpcname string, args interface{}, reply interface{}) bool {
// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234") // c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
sockname := coordinatorSock() sockname := coordinatorSock()