This commit is contained in:
Robert Morris 2022-03-02 04:48:31 -05:00
parent 732981285f
commit d94ea16780
2 changed files with 22 additions and 12 deletions

View File

@ -170,7 +170,7 @@ func (cfg *config) applier(i int, applyCh chan ApplyMsg) {
err_msg = fmt.Sprintf("server %v apply out of order %v", i, m.CommandIndex)
}
if err_msg != "" {
log.Fatalf("apply error: %v\n", err_msg)
log.Fatalf("apply error: %v", err_msg)
cfg.applyErr[i] = err_msg
// keep reading after error so that Raft doesn't block
// holding locks...
@ -181,13 +181,17 @@ func (cfg *config) applier(i int, applyCh chan ApplyMsg) {
// returns "" or error string
func (cfg *config) ingestSnap(i int, snapshot []byte, index int) string {
if snapshot == nil {
log.Fatalf("nil snapshot")
return "nil snapshot"
}
r := bytes.NewBuffer(snapshot)
d := labgob.NewDecoder(r)
var lastIncludedIndex int
var xlog []interface{}
if d.Decode(&lastIncludedIndex) != nil ||
d.Decode(&xlog) != nil {
log.Fatalf("snapshot decode error\n")
log.Fatalf("snapshot decode error")
return "snapshot Decode() error"
}
if index != -1 && index != lastIncludedIndex {
@ -206,15 +210,21 @@ const SnapShotInterval = 10
// periodically snapshot raft state
func (cfg *config) applierSnap(i int, applyCh chan ApplyMsg) {
cfg.mu.Lock()
rf := cfg.rafts[i]
cfg.mu.Unlock()
if rf == nil {
return // ???
}
for m := range applyCh {
err_msg := ""
if m.SnapshotValid {
if rf.CondInstallSnapshot(m.SnapshotTerm, m.SnapshotIndex, m.Snapshot) {
cfg.mu.Lock()
if cfg.rafts[i].CondInstallSnapshot(m.SnapshotTerm,
m.SnapshotIndex, m.Snapshot) {
err_msg = cfg.ingestSnap(i, m.Snapshot, m.SnapshotIndex)
}
cfg.mu.Unlock()
}
} else if m.CommandValid {
if m.CommandIndex != cfg.lastApplied[i]+1 {
err_msg = fmt.Sprintf("server %v apply out of order, expected index %v, got %v", i, cfg.lastApplied[i]+1, m.CommandIndex)
@ -243,13 +253,13 @@ func (cfg *config) applierSnap(i int, applyCh chan ApplyMsg) {
xlog = append(xlog, cfg.logs[i][j])
}
e.Encode(xlog)
cfg.rafts[i].Snapshot(m.CommandIndex, w.Bytes())
rf.Snapshot(m.CommandIndex, w.Bytes())
}
} else {
// Ignore other types of ApplyMsg.
}
if err_msg != "" {
log.Fatalf("apply error: %v\n", err_msg)
log.Fatalf("apply error: %v", err_msg)
cfg.applyErr[i] = err_msg
// keep reading after error so that Raft doesn't block
// holding locks...
@ -497,7 +507,7 @@ func (cfg *config) nCommitted(index int) (int, interface{}) {
if ok {
if count > 0 && cmd != cmd1 {
cfg.t.Fatalf("committed values do not match: index %v, %v, %v\n",
cfg.t.Fatalf("committed values do not match: index %v, %v, %v",
index, cmd, cmd1)
}
count += 1
@ -532,7 +542,7 @@ func (cfg *config) wait(index int, n int, startTerm int) interface{} {
}
nd, cmd := cfg.nCommitted(index)
if nd < n {
cfg.t.Fatalf("only %d decided for index %d; wanted %d\n",
cfg.t.Fatalf("only %d decided for index %d; wanted %d",
nd, index, n)
}
return cmd