Skip to content

Commit

Permalink
mvcc: tighten up watcher cancelation and revision handling
Browse files Browse the repository at this point in the history
Makes w.cur into w.minrev, the minimum revision for the next update, and
retries cancelation if the watcher isn't found (because it's being processed
by moveVictims).

Fixes: #5459
  • Loading branch information
Anthony Romano committed May 27, 2016
1 parent 59d44ea commit 1b02fb6
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 45 deletions.
88 changes: 50 additions & 38 deletions mvcc/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,19 +190,19 @@ func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch c
defer s.mu.Unlock()

wa := &watcher{
key: key,
end: end,
cur: startRev,
id: id,
ch: ch,
key: key,
end: end,
minRev: startRev,
id: id,
ch: ch,
}

s.store.mu.Lock()
synced := startRev > s.store.currentRev.main || startRev == 0
if synced {
wa.cur = s.store.currentRev.main + 1
if startRev > wa.cur {
wa.cur = startRev
wa.minRev = s.store.currentRev.main + 1
if startRev > wa.minRev {
wa.minRev = startRev
}
}
s.store.mu.Unlock()
Expand All @@ -214,30 +214,41 @@ func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch c
}
watcherGauge.Inc()

cancel := cancelFunc(func() {
return wa, func() { s.cancelWatcher(wa) }
}

// cancelWatcher removes references of the watcher from the watchableStore
func (s *watchableStore) cancelWatcher(wa *watcher) {
for {
s.mu.Lock()
// remove references of the watcher

if s.unsynced.delete(wa) {
slowWatcherGauge.Dec()
watcherGauge.Dec()
break
} else if s.synced.delete(wa) {
watcherGauge.Dec()
} else {
for _, wb := range s.victims {
if wb[wa] != nil {
slowWatcherGauge.Dec()
watcherGauge.Dec()
delete(wb, wa)
break
}
break
}

var victimBatch watcherBatch
for _, wb := range s.victims {
if wb[wa] != nil {
victimBatch = wb
break
}
}
s.mu.Unlock()
if victimBatch != nil {
slowWatcherGauge.Dec()
delete(victimBatch, wa)
break
}

// If we cannot find it, it should have finished watch.
})
// victim being processed so not accessible; retry
s.mu.Unlock()
time.Sleep(time.Millisecond)
}

return wa, cancel
watcherGauge.Dec()
s.mu.Unlock()
}

// syncWatchersLoop syncs the watcher in the unsynced map every 100ms.
Expand Down Expand Up @@ -307,7 +318,7 @@ func (s *watchableStore) moveVictims() (moved int) {
// try to send responses again
for w, eb := range wb {
select {
case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: w.cur}:
case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: w.minRev - 1}:
pendingEventsGauge.Add(float64(len(eb.evs)))
default:
if newVictim == nil {
Expand All @@ -329,9 +340,9 @@ func (s *watchableStore) moveVictims() (moved int) {
continue
}
if eb.moreRev != 0 {
w.cur = eb.moreRev
w.minRev = eb.moreRev
}
if w.cur < curRev {
if w.minRev <= curRev {
s.unsynced.add(w)
} else {
slowWatcherGauge.Dec()
Expand Down Expand Up @@ -385,16 +396,20 @@ func (s *watchableStore) syncWatchers() {
var victims watcherBatch
wb := newWatcherBatch(wg, evs)
for w := range wg.watchers {
w.minRev = curRev + 1

eb, ok := wb[w]
if !ok {
// bring un-notified watcher to synced
w.cur = curRev
s.synced.add(w)
s.unsynced.delete(w)
continue
}

w.cur = curRev
if eb.moreRev != 0 {
w.minRev = eb.moreRev
}

isBlocked := false
select {
case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev}:
Expand All @@ -410,7 +425,7 @@ func (s *watchableStore) syncWatchers() {
victims[w] = eb
} else {
if eb.moreRev != 0 {
w.cur = eb.moreRev
// stay unsynced; more to read
continue
}
s.synced.add(w)
Expand Down Expand Up @@ -458,11 +473,11 @@ func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
plog.Panicf("unexpected multiple revisions in notification")
}
select {
case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: s.Rev()}:
case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}:
pendingEventsGauge.Add(float64(len(eb.evs)))
default:
// move slow watcher to victims
w.cur = rev
w.minRev = rev + 1
if victim == nil {
victim = make(watcherBatch)
}
Expand Down Expand Up @@ -508,12 +523,9 @@ type watcher struct {
// If end is set, the watcher is on a range.
end []byte

// cur is the current watcher revision of a unsynced watcher.
// cur will be updated for unsynced watcher while it is catching up.
// cur is startRev of a synced watcher.
// cur will not be updated for synced watcher.
cur int64
id WatchID
// minRev is the minimum revision update the watcher will accept
minRev int64
id WatchID

// a chan to send out the watch response.
// The chan might be shared with other watchers.
Expand Down
4 changes: 2 additions & 2 deletions mvcc/watchable_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ func TestSyncWatchers(t *testing.T) {
}

for w := range sws {
if w.cur != s.Rev() {
t.Errorf("w.cur = %d, want %d", w.cur, s.Rev())
if w.minRev != s.Rev()+1 {
t.Errorf("w.minRev = %d, want %d", w.minRev, s.Rev()+1)
}
}

Expand Down
10 changes: 5 additions & 5 deletions mvcc/watcher_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func newWatcherBatch(wg *watcherGroup, evs []mvccpb.Event) watcherBatch {
wb := make(watcherBatch)
for _, ev := range evs {
for w := range wg.watcherSetByKey(string(ev.Kv.Key)) {
if ev.Kv.ModRevision >= w.cur {
if ev.Kv.ModRevision >= w.minRev {
// don't double notify
wb.add(w, ev)
}
Expand Down Expand Up @@ -233,10 +233,10 @@ func (wg *watcherGroup) choose(maxWatchers int, curRev, compactRev int64) (*watc
func (wg *watcherGroup) chooseAll(curRev, compactRev int64) int64 {
minRev := int64(math.MaxInt64)
for w := range wg.watchers {
if w.cur > curRev {
if w.minRev > curRev {
panic("watcher current revision should not exceed current revision")
}
if w.cur < compactRev {
if w.minRev < compactRev {
select {
case w.ch <- WatchResponse{WatchID: w.id, CompactRevision: compactRev}:
wg.delete(w)
Expand All @@ -245,8 +245,8 @@ func (wg *watcherGroup) chooseAll(curRev, compactRev int64) int64 {
}
continue
}
if minRev > w.cur {
minRev = w.cur
if minRev > w.minRev {
minRev = w.minRev
}
}
return minRev
Expand Down

0 comments on commit 1b02fb6

Please sign in to comment.