This commit is contained in:
Robert Morris 2022-02-28 13:35:32 -05:00
parent a997782d1d
commit 732981285f
2 changed files with 259 additions and 54 deletions

View File

@ -13,6 +13,7 @@ import "6.824/labrpc"
import "bytes"
import "log"
import "sync"
import "sync/atomic"
import "testing"
import "runtime"
import "math/rand"
@ -39,6 +40,7 @@ func makeSeed() int64 {
type config struct {
mu sync.Mutex
t *testing.T
finished int32
net *labrpc.Network
n int
rafts []*Raft
@ -47,6 +49,7 @@ type config struct {
saved []*Persister
endnames [][]string // the port file names each sends to
logs []map[int]interface{} // copy of each server's committed entries
lastApplied []int
start time.Time // time at which make_config() was called
// begin()/end() statistics
t0 time.Time // time at which test_test.go called cfg.begin()
@ -77,6 +80,7 @@ func make_config(t *testing.T, n int, unreliable bool, snapshot bool) *config {
cfg.saved = make([]*Persister, cfg.n)
cfg.endnames = make([][]string, cfg.n)
cfg.logs = make([]map[int]interface{}, cfg.n)
cfg.lastApplied = make([]int, cfg.n)
cfg.start = time.Now()
cfg.setunreliable(unreliable)
@ -175,58 +179,81 @@ func (cfg *config) applier(i int, applyCh chan ApplyMsg) {
}
}
// returns "" or error string
func (cfg *config) ingestSnap(i int, snapshot []byte, index int) string {
r := bytes.NewBuffer(snapshot)
d := labgob.NewDecoder(r)
var lastIncludedIndex int
var xlog []interface{}
if d.Decode(&lastIncludedIndex) != nil ||
d.Decode(&xlog) != nil {
log.Fatalf("snapshot decode error\n")
return "snapshot Decode() error"
}
if index != -1 && index != lastIncludedIndex {
err := fmt.Sprintf("server %v snapshot doesn't match m.SnapshotIndex", i)
return err
}
cfg.logs[i] = map[int]interface{}{}
for j := 0; j < len(xlog); j++ {
cfg.logs[i][j] = xlog[j]
}
cfg.lastApplied[i] = lastIncludedIndex
return ""
}
const SnapShotInterval = 10
// periodically snapshot raft state
func (cfg *config) applierSnap(i int, applyCh chan ApplyMsg) {
lastApplied := 0
for m := range applyCh {
err_msg := ""
if m.SnapshotValid {
//DPrintf("Installsnapshot %v %v\n", m.SnapshotIndex, lastApplied)
cfg.mu.Lock()
if cfg.rafts[i].CondInstallSnapshot(m.SnapshotTerm,
m.SnapshotIndex, m.Snapshot) {
cfg.logs[i] = make(map[int]interface{})
r := bytes.NewBuffer(m.Snapshot)
d := labgob.NewDecoder(r)
var v int
if d.Decode(&v) != nil {
log.Fatalf("decode error\n")
}
cfg.logs[i][m.SnapshotIndex] = v
lastApplied = m.SnapshotIndex
err_msg = cfg.ingestSnap(i, m.Snapshot, m.SnapshotIndex)
}
cfg.mu.Unlock()
} else if m.CommandValid && m.CommandIndex > lastApplied {
//DPrintf("apply %v lastApplied %v\n", m.CommandIndex, lastApplied)
} else if m.CommandValid {
if m.CommandIndex != cfg.lastApplied[i]+1 {
err_msg = fmt.Sprintf("server %v apply out of order, expected index %v, got %v", i, cfg.lastApplied[i]+1, m.CommandIndex)
}
if err_msg == "" {
cfg.mu.Lock()
err_msg, prevok := cfg.checkLogs(i, m)
var prevok bool
err_msg, prevok = cfg.checkLogs(i, m)
cfg.mu.Unlock()
if m.CommandIndex > 1 && prevok == false {
err_msg = fmt.Sprintf("server %v apply out of order %v", i, m.CommandIndex)
}
}
cfg.mu.Lock()
cfg.lastApplied[i] = m.CommandIndex
cfg.mu.Unlock()
if (m.CommandIndex+1)%SnapShotInterval == 0 {
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(m.CommandIndex)
var xlog []interface{}
for j := 0; j <= m.CommandIndex; j++ {
xlog = append(xlog, cfg.logs[i][j])
}
e.Encode(xlog)
cfg.rafts[i].Snapshot(m.CommandIndex, w.Bytes())
}
} else {
// Ignore other types of ApplyMsg.
}
if err_msg != "" {
log.Fatalf("apply error: %v\n", err_msg)
cfg.applyErr[i] = err_msg
// keep reading after error so that Raft doesn't block
// holding locks...
}
lastApplied = m.CommandIndex
if (m.CommandIndex+1)%SnapShotInterval == 0 {
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
v := m.Command
e.Encode(v)
cfg.rafts[i].Snapshot(m.CommandIndex, w.Bytes())
}
} else {
// Ignore other types of ApplyMsg or old
// commands. Old command may never happen,
// depending on the Raft implementation, but
// just in case.
// DPrintf("Ignore: Index %v lastApplied %v\n", m.CommandIndex, lastApplied)
}
}
}
@ -256,12 +283,24 @@ func (cfg *config) start1(i int, applier func(int, chan ApplyMsg)) {
cfg.mu.Lock()
cfg.lastApplied[i] = 0
// a fresh persister, so old instance doesn't overwrite
// new instance's persisted state.
// but copy old persister's content so that we always
// pass Make() the last persisted state.
if cfg.saved[i] != nil {
cfg.saved[i] = cfg.saved[i].Copy()
snapshot := cfg.saved[i].ReadSnapshot()
if snapshot != nil && len(snapshot) > 0 {
// mimic KV server and process snapshot now.
// ideally Raft should send it up on applyCh...
err := cfg.ingestSnap(i, snapshot, -1)
if err != "" {
cfg.t.Fatal(err)
}
}
} else {
cfg.saved[i] = MakePersister()
}
@ -291,7 +330,13 @@ func (cfg *config) checkTimeout() {
}
}
func (cfg *config) checkFinished() bool {
z := atomic.LoadInt32(&cfg.finished)
return z != 0
}
func (cfg *config) cleanup() {
atomic.StoreInt32(&cfg.finished, 1)
for i := 0; i < len(cfg.rafts); i++ {
if cfg.rafts[i] != nil {
cfg.rafts[i].Kill()
@ -367,8 +412,13 @@ func (cfg *config) setlongreordering(longrel bool) {
cfg.net.LongReordering(longrel)
}
// check that there's exactly one leader.
//
// check that one of the connected servers thinks
// it is the leader, and that no other connected
// server thinks otherwise.
//
// try a few times in case re-elections are needed.
//
func (cfg *config) checkOneLeader() int {
for iters := 0; iters < 10; iters++ {
ms := 450 + (rand.Int63() % 100)
@ -417,13 +467,16 @@ func (cfg *config) checkTerms() int {
return term
}
// check that there's no leader
//
// check that none of the connected servers
// thinks it is the leader.
//
func (cfg *config) checkNoLeader() {
for i := 0; i < cfg.n; i++ {
if cfg.connected[i] {
_, is_leader := cfg.rafts[i].GetState()
if is_leader {
cfg.t.Fatalf("expected no leader, but %v claims to be leader", i)
cfg.t.Fatalf("expected no leader among connected servers, but %v claims to be leader", i)
}
}
}
@ -500,7 +553,7 @@ func (cfg *config) wait(index int, n int, startTerm int) interface{} {
func (cfg *config) one(cmd interface{}, expectedServers int, retry bool) int {
t0 := time.Now()
starts := 0
for time.Since(t0).Seconds() < 10 {
for time.Since(t0).Seconds() < 10 && cfg.checkFinished() == false {
// try all the servers, maybe one is the leader.
index := -1
for si := 0; si < cfg.n; si++ {
@ -542,7 +595,9 @@ func (cfg *config) one(cmd interface{}, expectedServers int, retry bool) int {
time.Sleep(50 * time.Millisecond)
}
}
if cfg.checkFinished() == false {
cfg.t.Fatalf("one(%v) failed to reach agreement", cmd)
}
return -1
}

View File

@ -64,15 +64,19 @@ func TestReElection2A(t *testing.T) {
cfg.checkOneLeader()
// if the old leader rejoins, that shouldn't
// disturb the new leader.
// disturb the new leader. and the old leader
// should switch to follower.
cfg.connect(leader1)
leader2 := cfg.checkOneLeader()
// if there's no quorum, no leader should
// if there's no quorum, no new leader should
// be elected.
cfg.disconnect(leader2)
cfg.disconnect((leader2 + 1) % servers)
time.Sleep(2 * RaftElectionTimeout)
// check that the one connected server
// does not think it is the leader.
cfg.checkNoLeader()
// if a quorum arises, it should elect a leader.
@ -177,12 +181,105 @@ func TestRPCBytes2B(t *testing.T) {
cfg.end()
}
//
// test just failure of followers.
//
func For2023TestFollowerFailure2B(t *testing.T) {
servers := 3
cfg := make_config(t, servers, false, false)
defer cfg.cleanup()
cfg.begin("Test (2B): test progressive failure of followers")
cfg.one(101, servers, false)
// disconnect one follower from the network.
leader1 := cfg.checkOneLeader()
cfg.disconnect((leader1 + 1) % servers)
// the leader and remaining follower should be
// able to agree despite the disconnected follower.
cfg.one(102, servers-1, false)
time.Sleep(RaftElectionTimeout)
cfg.one(103, servers-1, false)
// disconnect the remaining follower
leader2 := cfg.checkOneLeader()
cfg.disconnect((leader2 + 1) % servers)
cfg.disconnect((leader2 + 2) % servers)
// submit a command.
index, _, ok := cfg.rafts[leader2].Start(104)
if ok != true {
t.Fatalf("leader rejected Start()")
}
if index != 4 {
t.Fatalf("expected index 4, got %v", index)
}
time.Sleep(2 * RaftElectionTimeout)
// check that command 104 did not commit.
n, _ := cfg.nCommitted(index)
if n > 0 {
t.Fatalf("%v committed but no majority", n)
}
cfg.end()
}
//
// test just failure of leaders.
//
func For2023TestLeaderFailure2B(t *testing.T) {
servers := 3
cfg := make_config(t, servers, false, false)
defer cfg.cleanup()
cfg.begin("Test (2B): test failure of leaders")
cfg.one(101, servers, false)
// disconnect the first leader.
leader1 := cfg.checkOneLeader()
cfg.disconnect(leader1)
// the remaining followers should elect
// a new leader.
cfg.one(102, servers-1, false)
time.Sleep(RaftElectionTimeout)
cfg.one(103, servers-1, false)
// disconnect the new leader.
leader2 := cfg.checkOneLeader()
cfg.disconnect(leader2)
// submit a command to each server.
for i := 0; i < servers; i++ {
cfg.rafts[i].Start(104)
}
time.Sleep(2 * RaftElectionTimeout)
// check that command 104 did not commit.
n, _ := cfg.nCommitted(4)
if n > 0 {
t.Fatalf("%v committed but no majority", n)
}
cfg.end()
}
//
// test that a follower participates after
// disconnect and re-connect.
//
func TestFailAgree2B(t *testing.T) {
servers := 3
cfg := make_config(t, servers, false, false)
defer cfg.cleanup()
cfg.begin("Test (2B): agreement despite follower disconnection")
cfg.begin("Test (2B): agreement after follower reconnects")
cfg.one(101, servers, false)
@ -1037,12 +1134,22 @@ func snapcommon(t *testing.T, name string, disconnect bool, reliable bool, crash
cfg.crash1(victim)
cfg.one(rand.Int(), servers-1, true)
}
// send enough to get a snapshot
for i := 0; i < SnapShotInterval+1; i++ {
// perhaps send enough to get a snapshot
nn := (SnapShotInterval / 2) + (rand.Int() % SnapShotInterval)
for i := 0; i < nn; i++ {
cfg.rafts[sender].Start(rand.Int())
}
// let applier threads catch up with the Start()'s
if disconnect == false && crash == false {
// make sure all followers have caught up, so that
// an InstallSnapshot RPC isn't required for
// TestSnapshotBasic2D().
cfg.one(rand.Int(), servers, true)
} else {
cfg.one(rand.Int(), servers-1, true)
}
if cfg.LogSize() >= MAXLOGSIZE {
cfg.t.Fatalf("Log size too large")
@ -1084,3 +1191,46 @@ func TestSnapshotInstallCrash2D(t *testing.T) {
func TestSnapshotInstallUnCrash2D(t *testing.T) {
snapcommon(t, "Test (2D): install snapshots (unreliable+crash)", false, false, true)
}
//
// do the servers persist the snapshots, and
// restart using snapshot along with the
// tail of the log?
//
func TestSnapshotAllCrash2D(t *testing.T) {
servers := 3
iters := 5
cfg := make_config(t, servers, false, true)
defer cfg.cleanup()
cfg.begin("Test (2D): crash and restart all servers")
cfg.one(rand.Int(), servers, true)
for i := 0; i < iters; i++ {
// perhaps enough to get a snapshot
nn := (SnapShotInterval / 2) + (rand.Int() % SnapShotInterval)
for i := 0; i < nn; i++ {
cfg.one(rand.Int(), servers, true)
}
index1 := cfg.one(rand.Int(), servers, true)
// crash all
for i := 0; i < servers; i++ {
cfg.crash1(i)
}
// revive all
for i := 0; i < servers; i++ {
cfg.start1(i, cfg.applierSnap)
cfg.connect(i)
}
index2 := cfg.one(rand.Int(), servers, true)
if index2 < index1+1 {
t.Fatalf("index decreased from %v to %v", index1, index2)
}
}
cfg.end()
}