From 2a0c44f616373312a53fc910f58ea281ea0eb1c9 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Sat, 3 Feb 2024 16:08:57 +0000 Subject: [PATCH 1/3] tests: clean-up raftLog.findConflict test Signed-off-by: Pavel Kalinnikov --- log_test.go | 42 ++++++++++++++++++++---------------------- 1 file changed, 20 insertions(+), 22 deletions(-) diff --git a/log_test.go b/log_test.go index 90ae9388..a6fe83d8 100644 --- a/log_test.go +++ b/log_test.go @@ -25,32 +25,30 @@ import ( func TestFindConflict(t *testing.T) { previousEnts := index(1).terms(1, 2, 3) - tests := []struct { - ents []pb.Entry - wconflict uint64 + for _, tt := range []struct { + ents []pb.Entry + want uint64 }{ - // no conflict, empty ent - {nil, 0}, + // no conflict, empty entries + {ents: nil, want: 0}, // no conflict - {index(1).terms(1, 2, 3), 0}, - {index(2).terms(2, 3), 0}, - {index(3).terms(3), 0}, + {ents: index(1).terms(1, 2, 3), want: 0}, + {ents: index(2).terms(2, 3), want: 0}, + {ents: index(3).terms(3), want: 0}, // no conflict, but has new entries - {index(1).terms(1, 2, 3, 4, 4), 4}, - {index(2).terms(2, 3, 4, 5), 4}, - {index(3).terms(3, 4, 4), 4}, - {index(4).terms(4, 4), 4}, + {ents: index(1).terms(1, 2, 3, 4, 4), want: 4}, + {ents: index(2).terms(2, 3, 4, 5), want: 4}, + {ents: index(3).terms(3, 4, 4), want: 4}, + {ents: index(4).terms(4, 4), want: 4}, // conflicts with existing entries - {index(1).terms(4, 4), 1}, - {index(2).terms(1, 4, 4), 2}, - {index(3).terms(1, 2, 4, 4), 3}, - } - - for i, tt := range tests { - t.Run(fmt.Sprint(i), func(t *testing.T) { - raftLog := newLog(NewMemoryStorage(), raftLogger) - raftLog.append(previousEnts...) - require.Equal(t, tt.wconflict, raftLog.findConflict(tt.ents)) + {ents: index(1).terms(4, 4), want: 1}, + {ents: index(2).terms(1, 4, 4), want: 2}, + {ents: index(3).terms(1, 2, 4, 4), want: 3}, + } { + t.Run("", func(t *testing.T) { + log := newLog(NewMemoryStorage(), discardLogger) + log.append(previousEnts...) + require.Equal(t, tt.want, log.findConflict(tt.ents)) }) } } From 17f5596afc65a881ee5b3a982d49374d5a14c383 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Fri, 2 Feb 2024 01:49:43 +0000 Subject: [PATCH 2/3] log: clean-up log conflict search Use the new entryID type. Move the preceding entry check into the conflict search method rather than do it outside. Signed-off-by: Pavel Kalinnikov --- log.go | 94 +++++++++++++++++++++++++++++++---------------------- log_test.go | 49 +++++++++++++++++++--------- 2 files changed, 89 insertions(+), 54 deletions(-) diff --git a/log.go b/log.go index bd7c2feb..a9416caf 100644 --- a/log.go +++ b/log.go @@ -105,27 +105,24 @@ func (l *raftLog) String() string { // maybeAppend returns (0, false) if the entries cannot be appended. Otherwise, // it returns (last index of new entries, true). func (l *raftLog) maybeAppend(a logSlice, committed uint64) (lastnewi uint64, ok bool) { - if !l.matchTerm(a.prev) { + match, ok := l.findConflict(a) + if !ok { return 0, false } - // TODO(pav-kv): propagate logSlice down the stack. It will be used all the - // way down in unstable, for safety checks, and for useful bookkeeping. - - lastnewi = a.prev.index + uint64(len(a.entries)) - ci := l.findConflict(a.entries) - switch { - case ci == 0: - case ci <= l.committed: - l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed) - default: - offset := a.prev.index + 1 - if ci-offset > uint64(len(a.entries)) { - l.logger.Panicf("index, %d, is out of range [%d]", ci-offset, len(a.entries)) - } - l.append(a.entries[ci-offset:]...) + if match.index < a.lastIndex() && match.index < l.committed { + l.logger.Panicf("entry %d is already committed [committed(%d)]", match.index+1, l.committed) } - l.commitTo(min(committed, lastnewi)) - return lastnewi, true + + // Fast-forward to the first mismatching or missing entry. + // NB: prev.index <= match.index <= a.lastIndex(), so the sub-slicing is safe. + a.entries = a.entries[match.index-a.prev.index:] + a.prev = match + + // TODO(pav-kv): pass the logSlice down the stack, for safety checks and + // bookkeeping in the unstable structure. + l.append(a.entries...) + l.commitTo(min(committed, a.lastIndex())) + return a.lastIndex(), true } func (l *raftLog) append(ents ...pb.Entry) uint64 { @@ -139,29 +136,48 @@ func (l *raftLog) append(ents ...pb.Entry) uint64 { return l.lastIndex() } -// findConflict finds the index of the conflict. -// It returns the first pair of conflicting entries between the existing -// entries and the given entries, if there are any. -// If there is no conflicting entries, and the existing entries contains -// all the given entries, zero will be returned. -// If there is no conflicting entries, but the given entries contains new -// entries, the index of the first new entry will be returned. -// An entry is considered to be conflicting if it has the same index but -// a different term. -// The index of the given entries MUST be continuously increasing. -func (l *raftLog) findConflict(ents []pb.Entry) uint64 { - for i := range ents { - if id := pbEntryID(&ents[i]); !l.matchTerm(id) { - if id.index <= l.lastIndex() { - // TODO(pav-kv): can simply print %+v of the id. This will change the - // log format though. - l.logger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]", - id.index, l.zeroTermOnOutOfBounds(l.term(id.index)), id.term) - } - return id.index +// findConflict finds the last entry in the given log slice that matches the +// log. The next entry either mismatches, or is missing. +// +// If the slice partially/fully matches, this method returns true. The returned +// entryID is the ID of the last matching entry. It can be s.prev if it is the +// only matching entry. It is guaranteed that the returned entryID.index is in +// the [s.prev.index, s.lastIndex()] range. +// +// All the entries up to the returned entryID are already present in the log, +// and do not need to be appended again. The caller can safely fast-forward an +// append request to the next entry after it. +// +// Returns false if the given slice mismatches the log entirely, i.e. the s.prev +// entry has a mismatching entryID.term. In this case an append request can not +// proceed. +func (l *raftLog) findConflict(s logSlice) (entryID, bool) { + // TODO(pav-kv): add a fast-path here. If s.term == raftLog.lastTerm, we can + // skip the match checks entirely. We can double-check only the last entry + // match, to be sure, but it is not necessary if raft invariants are true. + if !l.matchTerm(s.prev) { + return entryID{}, false + } + + // TODO(pav-kv): every matchTerm call in the linear scan below can fall back + // to fetching an entry from storage. This is inefficient, we can improve it. + // NB: logs that don't match at one index, don't match at all indices above. + // So we can use binary search to find the fork. + match := s.prev + for i := range s.entries { + id := pbEntryID(&s.entries[i]) + if l.matchTerm(id) { + match = id + continue } + if id.index <= l.lastIndex() { + // TODO(pav-kv): should simply print %+v of the id. + l.logger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]", + id.index, l.zeroTermOnOutOfBounds(l.term(id.index)), id.term) + } + return match, true } - return 0 + return match, true // all entries match } // findConflictByTerm returns a best guess on where this log ends matching diff --git a/log_test.go b/log_test.go index a6fe83d8..8deed904 100644 --- a/log_test.go +++ b/log_test.go @@ -25,30 +25,49 @@ import ( func TestFindConflict(t *testing.T) { previousEnts := index(1).terms(1, 2, 3) + ids := make([]entryID, 1, len(previousEnts)+1) // dummy (0, 0) at index 0 + for i := range previousEnts { + ids = append(ids, pbEntryID(&previousEnts[i])) + } for _, tt := range []struct { - ents []pb.Entry - want uint64 + prev entryID + ents []pb.Entry + notOk bool + want entryID }{ // no conflict, empty entries - {ents: nil, want: 0}, + {ents: nil, want: ids[0]}, + // prev does not match the log + {prev: entryID{term: 10, index: 1}, notOk: true}, // no conflict - {ents: index(1).terms(1, 2, 3), want: 0}, - {ents: index(2).terms(2, 3), want: 0}, - {ents: index(3).terms(3), want: 0}, + {prev: ids[0], ents: index(1).terms(1, 2, 3), want: ids[3]}, + {prev: ids[1], ents: index(2).terms(2, 3), want: ids[3]}, + {prev: ids[2], ents: index(3).terms(3), want: ids[3]}, // no conflict, but has new entries - {ents: index(1).terms(1, 2, 3, 4, 4), want: 4}, - {ents: index(2).terms(2, 3, 4, 5), want: 4}, - {ents: index(3).terms(3, 4, 4), want: 4}, - {ents: index(4).terms(4, 4), want: 4}, - // conflicts with existing entries - {ents: index(1).terms(4, 4), want: 1}, - {ents: index(2).terms(1, 4, 4), want: 2}, - {ents: index(3).terms(1, 2, 4, 4), want: 3}, + {prev: ids[0], ents: index(1).terms(1, 2, 3, 4, 4), want: ids[3]}, + {prev: ids[1], ents: index(2).terms(2, 3, 4, 4), want: ids[3]}, + {prev: ids[2], ents: index(3).terms(3, 4, 4), want: ids[3]}, + {prev: ids[3], ents: index(4).terms(4, 4), want: ids[3]}, + // passes prev check, but conflicts with existing entries + {prev: ids[0], ents: index(1).terms(4, 4), want: ids[0]}, + {prev: ids[1], ents: index(2).terms(1, 4, 4), want: ids[1]}, + {prev: ids[2], ents: index(3).terms(2, 2, 4, 4), want: ids[2]}, + // prev does not match + {prev: entryID{term: 4, index: 1}, ents: index(2).terms(4, 4), notOk: true}, + {prev: entryID{term: 5, index: 2}, ents: index(3).terms(5, 6), notOk: true}, + // out of bounds + {prev: entryID{term: 3, index: 10}, ents: index(11).terms(3), notOk: true}, + // just touching the right bound, but still out of bounds + {prev: entryID{term: 3, index: 4}, ents: index(5).terms(3, 3, 4), notOk: true}, } { t.Run("", func(t *testing.T) { log := newLog(NewMemoryStorage(), discardLogger) log.append(previousEnts...) - require.Equal(t, tt.want, log.findConflict(tt.ents)) + app := logSlice{term: 100, prev: tt.prev, entries: tt.ents} + require.NoError(t, app.valid()) + match, ok := log.findConflict(app) + require.Equal(t, !tt.notOk, ok) + require.Equal(t, tt.want, match) }) } } From 85f43e65740047a9ff7301c9580d67b4b62b02df Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Sat, 3 Feb 2024 17:44:28 +0000 Subject: [PATCH 3/3] log: consolidate the commited entry rewrite check The append function call right after this does the same check, we don't need to do this in two places. TestLogMaybeAppend exercises these panics, and confirms the behaviour is the same after the first panic is removed. Signed-off-by: Pavel Kalinnikov --- log.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/log.go b/log.go index a9416caf..b581ca0a 100644 --- a/log.go +++ b/log.go @@ -109,9 +109,6 @@ func (l *raftLog) maybeAppend(a logSlice, committed uint64) (lastnewi uint64, ok if !ok { return 0, false } - if match.index < a.lastIndex() && match.index < l.committed { - l.logger.Panicf("entry %d is already committed [committed(%d)]", match.index+1, l.committed) - } // Fast-forward to the first mismatching or missing entry. // NB: prev.index <= match.index <= a.lastIndex(), so the sub-slicing is safe. @@ -129,8 +126,8 @@ func (l *raftLog) append(ents ...pb.Entry) uint64 { if len(ents) == 0 { return l.lastIndex() } - if after := ents[0].Index - 1; after < l.committed { - l.logger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed) + if first := ents[0].Index; first <= l.committed { + l.logger.Panicf("entry %d is already committed [committed(%d)]", first, l.committed) } l.unstable.truncateAndAppend(ents) return l.lastIndex()