diff --git a/src/raft/config.go b/src/raft/config.go index f17f092..f3f4b90 100644 --- a/src/raft/config.go +++ b/src/raft/config.go @@ -13,6 +13,7 @@ import "6.824/labrpc" import "bytes" import "log" import "sync" +import "sync/atomic" import "testing" import "runtime" import "math/rand" @@ -37,17 +38,19 @@ func makeSeed() int64 { } type config struct { - mu sync.Mutex - t *testing.T - net *labrpc.Network - n int - rafts []*Raft - applyErr []string // from apply channel readers - connected []bool // whether each server is on the net - saved []*Persister - endnames [][]string // the port file names each sends to - logs []map[int]interface{} // copy of each server's committed entries - start time.Time // time at which make_config() was called + mu sync.Mutex + t *testing.T + finished int32 + net *labrpc.Network + n int + rafts []*Raft + applyErr []string // from apply channel readers + connected []bool // whether each server is on the net + saved []*Persister + endnames [][]string // the port file names each sends to + logs []map[int]interface{} // copy of each server's committed entries + lastApplied []int + start time.Time // time at which make_config() was called // begin()/end() statistics t0 time.Time // time at which test_test.go called cfg.begin() rpcs0 int // rpcTotal() at start of test @@ -77,6 +80,7 @@ func make_config(t *testing.T, n int, unreliable bool, snapshot bool) *config { cfg.saved = make([]*Persister, cfg.n) cfg.endnames = make([][]string, cfg.n) cfg.logs = make([]map[int]interface{}, cfg.n) + cfg.lastApplied = make([]int, cfg.n) cfg.start = time.Now() cfg.setunreliable(unreliable) @@ -175,57 +179,80 @@ func (cfg *config) applier(i int, applyCh chan ApplyMsg) { } } +// returns "" or error string +func (cfg *config) ingestSnap(i int, snapshot []byte, index int) string { + r := bytes.NewBuffer(snapshot) + d := labgob.NewDecoder(r) + var lastIncludedIndex int + var xlog []interface{} + if d.Decode(&lastIncludedIndex) != nil || + d.Decode(&xlog) != nil { + log.Fatalf("snapshot decode error\n") + return "snapshot Decode() error" + } + if index != -1 && index != lastIncludedIndex { + err := fmt.Sprintf("server %v snapshot doesn't match m.SnapshotIndex", i) + return err + } + cfg.logs[i] = map[int]interface{}{} + for j := 0; j < len(xlog); j++ { + cfg.logs[i][j] = xlog[j] + } + cfg.lastApplied[i] = lastIncludedIndex + return "" +} + const SnapShotInterval = 10 // periodically snapshot raft state func (cfg *config) applierSnap(i int, applyCh chan ApplyMsg) { - lastApplied := 0 for m := range applyCh { + err_msg := "" if m.SnapshotValid { - //DPrintf("Installsnapshot %v %v\n", m.SnapshotIndex, lastApplied) cfg.mu.Lock() if cfg.rafts[i].CondInstallSnapshot(m.SnapshotTerm, m.SnapshotIndex, m.Snapshot) { - cfg.logs[i] = make(map[int]interface{}) - r := bytes.NewBuffer(m.Snapshot) - d := labgob.NewDecoder(r) - var v int - if d.Decode(&v) != nil { - log.Fatalf("decode error\n") + err_msg = cfg.ingestSnap(i, m.Snapshot, m.SnapshotIndex) + } + cfg.mu.Unlock() + } else if m.CommandValid { + if m.CommandIndex != cfg.lastApplied[i]+1 { + err_msg = fmt.Sprintf("server %v apply out of order, expected index %v, got %v", i, cfg.lastApplied[i]+1, m.CommandIndex) + } + + if err_msg == "" { + cfg.mu.Lock() + var prevok bool + err_msg, prevok = cfg.checkLogs(i, m) + cfg.mu.Unlock() + if m.CommandIndex > 1 && prevok == false { + err_msg = fmt.Sprintf("server %v apply out of order %v", i, m.CommandIndex) } - cfg.logs[i][m.SnapshotIndex] = v - lastApplied = m.SnapshotIndex } - cfg.mu.Unlock() - } else if m.CommandValid && m.CommandIndex > lastApplied { - //DPrintf("apply %v lastApplied %v\n", m.CommandIndex, lastApplied) + cfg.mu.Lock() - err_msg, prevok := cfg.checkLogs(i, m) + cfg.lastApplied[i] = m.CommandIndex cfg.mu.Unlock() - if m.CommandIndex > 1 && prevok == false { - err_msg = fmt.Sprintf("server %v apply out of order %v", i, m.CommandIndex) - } - if err_msg != "" { - log.Fatalf("apply error: %v\n", err_msg) - cfg.applyErr[i] = err_msg - // keep reading after error so that Raft doesn't block - // holding locks... - } - lastApplied = m.CommandIndex + if (m.CommandIndex+1)%SnapShotInterval == 0 { w := new(bytes.Buffer) e := labgob.NewEncoder(w) - v := m.Command - e.Encode(v) + e.Encode(m.CommandIndex) + var xlog []interface{} + for j := 0; j <= m.CommandIndex; j++ { + xlog = append(xlog, cfg.logs[i][j]) + } + e.Encode(xlog) cfg.rafts[i].Snapshot(m.CommandIndex, w.Bytes()) } } else { - // Ignore other types of ApplyMsg or old - // commands. Old command may never happen, - // depending on the Raft implementation, but - // just in case. - // DPrintf("Ignore: Index %v lastApplied %v\n", m.CommandIndex, lastApplied) - + // Ignore other types of ApplyMsg. + } + if err_msg != "" { + log.Fatalf("apply error: %v\n", err_msg) + cfg.applyErr[i] = err_msg + // keep reading after error so that Raft doesn't block + // holding locks... } } } @@ -256,12 +283,24 @@ func (cfg *config) start1(i int, applier func(int, chan ApplyMsg)) { cfg.mu.Lock() + cfg.lastApplied[i] = 0 + // a fresh persister, so old instance doesn't overwrite // new instance's persisted state. // but copy old persister's content so that we always // pass Make() the last persisted state. if cfg.saved[i] != nil { cfg.saved[i] = cfg.saved[i].Copy() + + snapshot := cfg.saved[i].ReadSnapshot() + if snapshot != nil && len(snapshot) > 0 { + // mimic KV server and process snapshot now. + // ideally Raft should send it up on applyCh... + err := cfg.ingestSnap(i, snapshot, -1) + if err != "" { + cfg.t.Fatal(err) + } + } } else { cfg.saved[i] = MakePersister() } @@ -291,7 +330,13 @@ func (cfg *config) checkTimeout() { } } +func (cfg *config) checkFinished() bool { + z := atomic.LoadInt32(&cfg.finished) + return z != 0 +} + func (cfg *config) cleanup() { + atomic.StoreInt32(&cfg.finished, 1) for i := 0; i < len(cfg.rafts); i++ { if cfg.rafts[i] != nil { cfg.rafts[i].Kill() @@ -367,8 +412,13 @@ func (cfg *config) setlongreordering(longrel bool) { cfg.net.LongReordering(longrel) } -// check that there's exactly one leader. +// +// check that one of the connected servers thinks +// it is the leader, and that no other connected +// server thinks otherwise. +// // try a few times in case re-elections are needed. +// func (cfg *config) checkOneLeader() int { for iters := 0; iters < 10; iters++ { ms := 450 + (rand.Int63() % 100) @@ -417,13 +467,16 @@ func (cfg *config) checkTerms() int { return term } -// check that there's no leader +// +// check that none of the connected servers +// thinks it is the leader. +// func (cfg *config) checkNoLeader() { for i := 0; i < cfg.n; i++ { if cfg.connected[i] { _, is_leader := cfg.rafts[i].GetState() if is_leader { - cfg.t.Fatalf("expected no leader, but %v claims to be leader", i) + cfg.t.Fatalf("expected no leader among connected servers, but %v claims to be leader", i) } } } @@ -500,7 +553,7 @@ func (cfg *config) wait(index int, n int, startTerm int) interface{} { func (cfg *config) one(cmd interface{}, expectedServers int, retry bool) int { t0 := time.Now() starts := 0 - for time.Since(t0).Seconds() < 10 { + for time.Since(t0).Seconds() < 10 && cfg.checkFinished() == false { // try all the servers, maybe one is the leader. index := -1 for si := 0; si < cfg.n; si++ { @@ -542,7 +595,9 @@ func (cfg *config) one(cmd interface{}, expectedServers int, retry bool) int { time.Sleep(50 * time.Millisecond) } } - cfg.t.Fatalf("one(%v) failed to reach agreement", cmd) + if cfg.checkFinished() == false { + cfg.t.Fatalf("one(%v) failed to reach agreement", cmd) + } return -1 } diff --git a/src/raft/test_test.go b/src/raft/test_test.go index 914bc5d..f9f67bd 100644 --- a/src/raft/test_test.go +++ b/src/raft/test_test.go @@ -64,15 +64,19 @@ func TestReElection2A(t *testing.T) { cfg.checkOneLeader() // if the old leader rejoins, that shouldn't - // disturb the new leader. + // disturb the new leader. and the old leader + // should switch to follower. cfg.connect(leader1) leader2 := cfg.checkOneLeader() - // if there's no quorum, no leader should + // if there's no quorum, no new leader should // be elected. cfg.disconnect(leader2) cfg.disconnect((leader2 + 1) % servers) time.Sleep(2 * RaftElectionTimeout) + + // check that the one connected server + // does not think it is the leader. cfg.checkNoLeader() // if a quorum arises, it should elect a leader. @@ -177,12 +181,105 @@ func TestRPCBytes2B(t *testing.T) { cfg.end() } +// +// test just failure of followers. +// +func For2023TestFollowerFailure2B(t *testing.T) { + servers := 3 + cfg := make_config(t, servers, false, false) + defer cfg.cleanup() + + cfg.begin("Test (2B): test progressive failure of followers") + + cfg.one(101, servers, false) + + // disconnect one follower from the network. + leader1 := cfg.checkOneLeader() + cfg.disconnect((leader1 + 1) % servers) + + // the leader and remaining follower should be + // able to agree despite the disconnected follower. + cfg.one(102, servers-1, false) + time.Sleep(RaftElectionTimeout) + cfg.one(103, servers-1, false) + + // disconnect the remaining follower + leader2 := cfg.checkOneLeader() + cfg.disconnect((leader2 + 1) % servers) + cfg.disconnect((leader2 + 2) % servers) + + // submit a command. + index, _, ok := cfg.rafts[leader2].Start(104) + if ok != true { + t.Fatalf("leader rejected Start()") + } + if index != 4 { + t.Fatalf("expected index 4, got %v", index) + } + + time.Sleep(2 * RaftElectionTimeout) + + // check that command 104 did not commit. + n, _ := cfg.nCommitted(index) + if n > 0 { + t.Fatalf("%v committed but no majority", n) + } + + cfg.end() +} + +// +// test just failure of leaders. +// +func For2023TestLeaderFailure2B(t *testing.T) { + servers := 3 + cfg := make_config(t, servers, false, false) + defer cfg.cleanup() + + cfg.begin("Test (2B): test failure of leaders") + + cfg.one(101, servers, false) + + // disconnect the first leader. + leader1 := cfg.checkOneLeader() + cfg.disconnect(leader1) + + // the remaining followers should elect + // a new leader. + cfg.one(102, servers-1, false) + time.Sleep(RaftElectionTimeout) + cfg.one(103, servers-1, false) + + // disconnect the new leader. + leader2 := cfg.checkOneLeader() + cfg.disconnect(leader2) + + // submit a command to each server. + for i := 0; i < servers; i++ { + cfg.rafts[i].Start(104) + } + + time.Sleep(2 * RaftElectionTimeout) + + // check that command 104 did not commit. + n, _ := cfg.nCommitted(4) + if n > 0 { + t.Fatalf("%v committed but no majority", n) + } + + cfg.end() +} + +// +// test that a follower participates after +// disconnect and re-connect. +// func TestFailAgree2B(t *testing.T) { servers := 3 cfg := make_config(t, servers, false, false) defer cfg.cleanup() - cfg.begin("Test (2B): agreement despite follower disconnection") + cfg.begin("Test (2B): agreement after follower reconnects") cfg.one(101, servers, false) @@ -1037,12 +1134,22 @@ func snapcommon(t *testing.T, name string, disconnect bool, reliable bool, crash cfg.crash1(victim) cfg.one(rand.Int(), servers-1, true) } - // send enough to get a snapshot - for i := 0; i < SnapShotInterval+1; i++ { + + // perhaps send enough to get a snapshot + nn := (SnapShotInterval / 2) + (rand.Int() % SnapShotInterval) + for i := 0; i < nn; i++ { cfg.rafts[sender].Start(rand.Int()) } + // let applier threads catch up with the Start()'s - cfg.one(rand.Int(), servers-1, true) + if disconnect == false && crash == false { + // make sure all followers have caught up, so that + // an InstallSnapshot RPC isn't required for + // TestSnapshotBasic2D(). + cfg.one(rand.Int(), servers, true) + } else { + cfg.one(rand.Int(), servers-1, true) + } if cfg.LogSize() >= MAXLOGSIZE { cfg.t.Fatalf("Log size too large") @@ -1084,3 +1191,46 @@ func TestSnapshotInstallCrash2D(t *testing.T) { func TestSnapshotInstallUnCrash2D(t *testing.T) { snapcommon(t, "Test (2D): install snapshots (unreliable+crash)", false, false, true) } + +// +// do the servers persist the snapshots, and +// restart using snapshot along with the +// tail of the log? +// +func TestSnapshotAllCrash2D(t *testing.T) { + servers := 3 + iters := 5 + cfg := make_config(t, servers, false, true) + defer cfg.cleanup() + + cfg.begin("Test (2D): crash and restart all servers") + + cfg.one(rand.Int(), servers, true) + + for i := 0; i < iters; i++ { + // perhaps enough to get a snapshot + nn := (SnapShotInterval / 2) + (rand.Int() % SnapShotInterval) + for i := 0; i < nn; i++ { + cfg.one(rand.Int(), servers, true) + } + + index1 := cfg.one(rand.Int(), servers, true) + + // crash all + for i := 0; i < servers; i++ { + cfg.crash1(i) + } + + // revive all + for i := 0; i < servers; i++ { + cfg.start1(i, cfg.applierSnap) + cfg.connect(i) + } + + index2 := cfg.one(rand.Int(), servers, true) + if index2 < index1+1 { + t.Fatalf("index decreased from %v to %v", index1, index2) + } + } + cfg.end() +}