Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

log: clean-up log conflict search #155

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 55 additions & 42 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,63 +105,76 @@ func (l *raftLog) String() string {
// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,
// it returns (last index of new entries, true).
func (l *raftLog) maybeAppend(a logSlice, committed uint64) (lastnewi uint64, ok bool) {
if !l.matchTerm(a.prev) {
match, ok := l.findConflict(a)
if !ok {
return 0, false
}
// TODO(pav-kv): propagate logSlice down the stack. It will be used all the
// way down in unstable, for safety checks, and for useful bookkeeping.

lastnewi = a.prev.index + uint64(len(a.entries))
ci := l.findConflict(a.entries)
switch {
case ci == 0:
case ci <= l.committed:
l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
default:
offset := a.prev.index + 1
if ci-offset > uint64(len(a.entries)) {
l.logger.Panicf("index, %d, is out of range [%d]", ci-offset, len(a.entries))
}
l.append(a.entries[ci-offset:]...)
}
l.commitTo(min(committed, lastnewi))
return lastnewi, true

// Fast-forward to the first mismatching or missing entry.
// NB: prev.index <= match.index <= a.lastIndex(), so the sub-slicing is safe.
a.entries = a.entries[match.index-a.prev.index:]
a.prev = match

// TODO(pav-kv): pass the logSlice down the stack, for safety checks and
// bookkeeping in the unstable structure.
l.append(a.entries...)
l.commitTo(min(committed, a.lastIndex()))
return a.lastIndex(), true
}

func (l *raftLog) append(ents ...pb.Entry) uint64 {
if len(ents) == 0 {
return l.lastIndex()
}
if after := ents[0].Index - 1; after < l.committed {
l.logger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed)
if first := ents[0].Index; first <= l.committed {
l.logger.Panicf("entry %d is already committed [committed(%d)]", first, l.committed)
}
l.unstable.truncateAndAppend(ents)
return l.lastIndex()
}

// findConflict finds the index of the conflict.
// It returns the first pair of conflicting entries between the existing
// entries and the given entries, if there are any.
// If there is no conflicting entries, and the existing entries contains
// all the given entries, zero will be returned.
// If there is no conflicting entries, but the given entries contains new
// entries, the index of the first new entry will be returned.
// An entry is considered to be conflicting if it has the same index but
// a different term.
// The index of the given entries MUST be continuously increasing.
func (l *raftLog) findConflict(ents []pb.Entry) uint64 {
for i := range ents {
if id := pbEntryID(&ents[i]); !l.matchTerm(id) {
if id.index <= l.lastIndex() {
// TODO(pav-kv): can simply print %+v of the id. This will change the
// log format though.
l.logger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]",
id.index, l.zeroTermOnOutOfBounds(l.term(id.index)), id.term)
}
return id.index
// findConflict finds the last entry in the given log slice that matches the
// log. The next entry either mismatches, or is missing.
//
// If the slice partially/fully matches, this method returns true. The returned
// entryID is the ID of the last matching entry. It can be s.prev if it is the
// only matching entry. It is guaranteed that the returned entryID.index is in
// the [s.prev.index, s.lastIndex()] range.
//
// All the entries up to the returned entryID are already present in the log,
// and do not need to be appended again. The caller can safely fast-forward an
// append request to the next entry after it.
//
// Returns false if the given slice mismatches the log entirely, i.e. the s.prev
// entry has a mismatching entryID.term. In this case an append request can not
// proceed.
func (l *raftLog) findConflict(s logSlice) (entryID, bool) {
// TODO(pav-kv): add a fast-path here. If s.term == raftLog.lastTerm, we can
// skip the match checks entirely. We can double-check only the last entry
// match, to be sure, but it is not necessary if raft invariants are true.
if !l.matchTerm(s.prev) {
return entryID{}, false
}

// TODO(pav-kv): every matchTerm call in the linear scan below can fall back
// to fetching an entry from storage. This is inefficient, we can improve it.
// NB: logs that don't match at one index, don't match at all indices above.
// So we can use binary search to find the fork.
match := s.prev
for i := range s.entries {
id := pbEntryID(&s.entries[i])
if l.matchTerm(id) {
match = id
continue
}
if id.index <= l.lastIndex() {
// TODO(pav-kv): should simply print %+v of the id.
l.logger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]",
id.index, l.zeroTermOnOutOfBounds(l.term(id.index)), id.term)
}
return match, true
}
return 0
return match, true // all entries match
}

// findConflictByTerm returns a best guess on where this log ends matching
Expand Down
63 changes: 40 additions & 23 deletions log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,32 +25,49 @@ import (

func TestFindConflict(t *testing.T) {
previousEnts := index(1).terms(1, 2, 3)
tests := []struct {
ents []pb.Entry
wconflict uint64
ids := make([]entryID, 1, len(previousEnts)+1) // dummy (0, 0) at index 0
for i := range previousEnts {
ids = append(ids, pbEntryID(&previousEnts[i]))
}
for _, tt := range []struct {
prev entryID
ents []pb.Entry
notOk bool
want entryID
}{
// no conflict, empty ent
{nil, 0},
// no conflict, empty entries
{ents: nil, want: ids[0]},
// prev does not match the log
{prev: entryID{term: 10, index: 1}, notOk: true},
// no conflict
{index(1).terms(1, 2, 3), 0},
{index(2).terms(2, 3), 0},
{index(3).terms(3), 0},
{prev: ids[0], ents: index(1).terms(1, 2, 3), want: ids[3]},
{prev: ids[1], ents: index(2).terms(2, 3), want: ids[3]},
{prev: ids[2], ents: index(3).terms(3), want: ids[3]},
// no conflict, but has new entries
{index(1).terms(1, 2, 3, 4, 4), 4},
{index(2).terms(2, 3, 4, 5), 4},
{index(3).terms(3, 4, 4), 4},
{index(4).terms(4, 4), 4},
// conflicts with existing entries
{index(1).terms(4, 4), 1},
{index(2).terms(1, 4, 4), 2},
{index(3).terms(1, 2, 4, 4), 3},
}

for i, tt := range tests {
t.Run(fmt.Sprint(i), func(t *testing.T) {
raftLog := newLog(NewMemoryStorage(), raftLogger)
raftLog.append(previousEnts...)
require.Equal(t, tt.wconflict, raftLog.findConflict(tt.ents))
{prev: ids[0], ents: index(1).terms(1, 2, 3, 4, 4), want: ids[3]},
{prev: ids[1], ents: index(2).terms(2, 3, 4, 4), want: ids[3]},
{prev: ids[2], ents: index(3).terms(3, 4, 4), want: ids[3]},
{prev: ids[3], ents: index(4).terms(4, 4), want: ids[3]},
// passes prev check, but conflicts with existing entries
{prev: ids[0], ents: index(1).terms(4, 4), want: ids[0]},
{prev: ids[1], ents: index(2).terms(1, 4, 4), want: ids[1]},
{prev: ids[2], ents: index(3).terms(2, 2, 4, 4), want: ids[2]},
// prev does not match
{prev: entryID{term: 4, index: 1}, ents: index(2).terms(4, 4), notOk: true},
{prev: entryID{term: 5, index: 2}, ents: index(3).terms(5, 6), notOk: true},
// out of bounds
{prev: entryID{term: 3, index: 10}, ents: index(11).terms(3), notOk: true},
// just touching the right bound, but still out of bounds
{prev: entryID{term: 3, index: 4}, ents: index(5).terms(3, 3, 4), notOk: true},
} {
t.Run("", func(t *testing.T) {
log := newLog(NewMemoryStorage(), discardLogger)
log.append(previousEnts...)
app := logSlice{term: 100, prev: tt.prev, entries: tt.ents}
require.NoError(t, app.valid())
match, ok := log.findConflict(app)
require.Equal(t, !tt.notOk, ok)
require.Equal(t, tt.want, match)
})
}
}
Expand Down
Loading