From d94ea1678082307cb5bb6a36f6cd8eb0c117614a Mon Sep 17 00:00:00 2001 From: Robert Morris Date: Wed, 2 Mar 2022 04:48:31 -0500 Subject: [PATCH] update --- src/raft/config.go | 30 ++++++++++++++++++++---------- src/raft/raft.go | 4 ++-- 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/src/raft/config.go b/src/raft/config.go index f3f4b90..cc956b8 100644 --- a/src/raft/config.go +++ b/src/raft/config.go @@ -170,7 +170,7 @@ func (cfg *config) applier(i int, applyCh chan ApplyMsg) { err_msg = fmt.Sprintf("server %v apply out of order %v", i, m.CommandIndex) } if err_msg != "" { - log.Fatalf("apply error: %v\n", err_msg) + log.Fatalf("apply error: %v", err_msg) cfg.applyErr[i] = err_msg // keep reading after error so that Raft doesn't block // holding locks... @@ -181,13 +181,17 @@ func (cfg *config) applier(i int, applyCh chan ApplyMsg) { // returns "" or error string func (cfg *config) ingestSnap(i int, snapshot []byte, index int) string { + if snapshot == nil { + log.Fatalf("nil snapshot") + return "nil snapshot" + } r := bytes.NewBuffer(snapshot) d := labgob.NewDecoder(r) var lastIncludedIndex int var xlog []interface{} if d.Decode(&lastIncludedIndex) != nil || d.Decode(&xlog) != nil { - log.Fatalf("snapshot decode error\n") + log.Fatalf("snapshot decode error") return "snapshot Decode() error" } if index != -1 && index != lastIncludedIndex { @@ -206,15 +210,21 @@ const SnapShotInterval = 10 // periodically snapshot raft state func (cfg *config) applierSnap(i int, applyCh chan ApplyMsg) { + cfg.mu.Lock() + rf := cfg.rafts[i] + cfg.mu.Unlock() + if rf == nil { + return // ??? + } + for m := range applyCh { err_msg := "" if m.SnapshotValid { - cfg.mu.Lock() - if cfg.rafts[i].CondInstallSnapshot(m.SnapshotTerm, - m.SnapshotIndex, m.Snapshot) { + if rf.CondInstallSnapshot(m.SnapshotTerm, m.SnapshotIndex, m.Snapshot) { + cfg.mu.Lock() err_msg = cfg.ingestSnap(i, m.Snapshot, m.SnapshotIndex) + cfg.mu.Unlock() } - cfg.mu.Unlock() } else if m.CommandValid { if m.CommandIndex != cfg.lastApplied[i]+1 { err_msg = fmt.Sprintf("server %v apply out of order, expected index %v, got %v", i, cfg.lastApplied[i]+1, m.CommandIndex) @@ -243,13 +253,13 @@ func (cfg *config) applierSnap(i int, applyCh chan ApplyMsg) { xlog = append(xlog, cfg.logs[i][j]) } e.Encode(xlog) - cfg.rafts[i].Snapshot(m.CommandIndex, w.Bytes()) + rf.Snapshot(m.CommandIndex, w.Bytes()) } } else { // Ignore other types of ApplyMsg. } if err_msg != "" { - log.Fatalf("apply error: %v\n", err_msg) + log.Fatalf("apply error: %v", err_msg) cfg.applyErr[i] = err_msg // keep reading after error so that Raft doesn't block // holding locks... @@ -497,7 +507,7 @@ func (cfg *config) nCommitted(index int) (int, interface{}) { if ok { if count > 0 && cmd != cmd1 { - cfg.t.Fatalf("committed values do not match: index %v, %v, %v\n", + cfg.t.Fatalf("committed values do not match: index %v, %v, %v", index, cmd, cmd1) } count += 1 @@ -532,7 +542,7 @@ func (cfg *config) wait(index int, n int, startTerm int) interface{} { } nd, cmd := cfg.nCommitted(index) if nd < n { - cfg.t.Fatalf("only %d decided for index %d; wanted %d\n", + cfg.t.Fatalf("only %d decided for index %d; wanted %d", nd, index, n) } return cmd diff --git a/src/raft/raft.go b/src/raft/raft.go index 29ea521..afe91ea 100644 --- a/src/raft/raft.go +++ b/src/raft/raft.go @@ -18,11 +18,11 @@ package raft // import ( -// "bytes" + // "bytes" "sync" "sync/atomic" -// "6.824/labgob" + // "6.824/labgob" "6.824/labrpc" )