Skip to content

Commit

Permalink
mvcc: restore unsynced watchers
Browse files Browse the repository at this point in the history
In case syncWatchersLoop() starts before Restore() is called,
watchers already added by that moment are moved to s.synced by the loop.
However, there is a broken logic that moves watchers from s.synced
to s.uncyned without setting keyWatchers of the watcherGroup.
Eventually syncWatchers() fails to pickup those watchers from s.unsynced
and no events are sent to the watchers, because newWatcherBatch() called
in the function uses wg.watcherSetByKey() internally that requires
a proper keyWatchers value.
  • Loading branch information
Iwasaki Yudai committed Feb 6, 2018
1 parent 3903385 commit fcab10b
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 25 deletions.
2 changes: 1 addition & 1 deletion internal/mvcc/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (s *watchableStore) Restore(b backend.Backend) error {
}

for wa := range s.synced.watchers {
s.unsynced.watchers.add(wa)
s.unsynced.add(wa)
}
s.synced = newWatcherGroup()
return nil
Expand Down
57 changes: 33 additions & 24 deletions internal/mvcc/watchable_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,36 +297,45 @@ func TestWatchFutureRev(t *testing.T) {
}

func TestWatchRestore(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
defer cleanup(s, b, tmpPath)
test := func(delay time.Duration) func(t *testing.T) {
return func(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
defer cleanup(s, b, tmpPath)

testKey := []byte("foo")
testValue := []byte("bar")
rev := s.Put(testKey, testValue, lease.NoLease)
testKey := []byte("foo")
testValue := []byte("bar")
rev := s.Put(testKey, testValue, lease.NoLease)

newBackend, newPath := backend.NewDefaultTmpBackend()
newStore := newWatchableStore(newBackend, &lease.FakeLessor{}, nil)
defer cleanup(newStore, newBackend, newPath)
newBackend, newPath := backend.NewDefaultTmpBackend()
newStore := newWatchableStore(newBackend, &lease.FakeLessor{}, nil)
defer cleanup(newStore, newBackend, newPath)

w := newStore.NewWatchStream()
w.Watch(0, testKey, nil, rev-1)
w := newStore.NewWatchStream()
w.Watch(0, testKey, nil, rev-1)

newStore.Restore(b)
select {
case resp := <-w.Chan():
if resp.Revision != rev {
t.Fatalf("rev = %d, want %d", resp.Revision, rev)
}
if len(resp.Events) != 1 {
t.Fatalf("failed to get events from the response")
}
if resp.Events[0].Kv.ModRevision != rev {
t.Fatalf("kv.rev = %d, want %d", resp.Events[0].Kv.ModRevision, rev)
time.Sleep(delay)

newStore.Restore(b)
select {
case resp := <-w.Chan():
if resp.Revision != rev {
t.Fatalf("rev = %d, want %d", resp.Revision, rev)
}
if len(resp.Events) != 1 {
t.Fatalf("failed to get events from the response")
}
if resp.Events[0].Kv.ModRevision != rev {
t.Fatalf("kv.rev = %d, want %d", resp.Events[0].Kv.ModRevision, rev)
}
case <-time.After(time.Second):
t.Fatal("failed to receive event in 1 second.")
}
}
case <-time.After(time.Second):
t.Fatal("failed to receive event in 1 second.")
}

t.Run("Normal", test(0))
t.Run("RunSyncWatchLoopBeforeRestore", test(time.Millisecond*120)) // longer than default waitDuration
}

// TestWatchBatchUnsynced tests batching on unsynced watchers
Expand Down

0 comments on commit fcab10b

Please sign in to comment.