Skip to content

Commit

Permalink
Merge pull request kubernetes#24208 from ncdc/watch-cache-check-start…
Browse files Browse the repository at this point in the history
…ing-rv

Automatic merge from submit-queue

Honor starting resourceVersion in watch cache

Compare the requested resourceVersion to each event's resourceVersion to ensure events that occurred
in the past are not sent to the client.

Fixes kubernetes#24194 

cc @liggitt @wojtek-t
  • Loading branch information
k8s-merge-robot committed Apr 14, 2016
2 parents 1186f4b + 049e63d commit d800dca
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 9 deletions.
13 changes: 8 additions & 5 deletions pkg/storage/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,

c.Lock()
defer c.Unlock()
watcher := newCacheWatcher(initEvents, filterFunction(key, c.keyFunc, filter), forgetWatcher(c, c.watcherIdx))
watcher := newCacheWatcher(watchRV, initEvents, filterFunction(key, c.keyFunc, filter), forgetWatcher(c, c.watcherIdx))
c.watchers[c.watcherIdx] = watcher
c.watcherIdx++
return watcher, nil
Expand Down Expand Up @@ -465,15 +465,15 @@ type cacheWatcher struct {
forget func(bool)
}

func newCacheWatcher(initEvents []watchCacheEvent, filter FilterFunc, forget func(bool)) *cacheWatcher {
func newCacheWatcher(resourceVersion uint64, initEvents []watchCacheEvent, filter FilterFunc, forget func(bool)) *cacheWatcher {
watcher := &cacheWatcher{
input: make(chan watchCacheEvent, 10),
result: make(chan watch.Event, 10),
filter: filter,
stopped: false,
forget: forget,
}
go watcher.process(initEvents)
go watcher.process(initEvents, resourceVersion)
return watcher
}

Expand Down Expand Up @@ -537,7 +537,7 @@ func (c *cacheWatcher) sendWatchCacheEvent(event watchCacheEvent) {
}
}

func (c *cacheWatcher) process(initEvents []watchCacheEvent) {
func (c *cacheWatcher) process(initEvents []watchCacheEvent, resourceVersion uint64) {
defer utilruntime.HandleCrash()

for _, event := range initEvents {
Expand All @@ -550,6 +550,9 @@ func (c *cacheWatcher) process(initEvents []watchCacheEvent) {
if !ok {
return
}
c.sendWatchCacheEvent(event)
// only send events newer than resourceVersion
if event.ResourceVersion > resourceVersion {
c.sendWatchCacheEvent(event)
}
}
}
48 changes: 48 additions & 0 deletions pkg/storage/cacher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,3 +346,51 @@ func TestFiltering(t *testing.T) {
verifyWatchEvent(t, watcher, watch.Modified, podFooPrime)
verifyWatchEvent(t, watcher, watch.Deleted, podFooPrime)
}

func TestStartingResourceVersion(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
defer server.Terminate(t)
cacher := newTestCacher(etcdStorage)
defer cacher.Stop()

// add 1 object
podFoo := makeTestPod("foo")
fooCreated := updatePod(t, etcdStorage, podFoo, nil)

// Set up Watch starting at fooCreated.ResourceVersion + 10
rv, err := storage.ParseWatchResourceVersion(fooCreated.ResourceVersion)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
rv += 10
startVersion := strconv.Itoa(int(rv))

watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", startVersion, storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer watcher.Stop()

lastFoo := fooCreated
for i := 0; i < 11; i++ {
podFooForUpdate := makeTestPod("foo")
podFooForUpdate.Labels = map[string]string{"foo": strconv.Itoa(i)}
lastFoo = updatePod(t, etcdStorage, podFooForUpdate, lastFoo)
}

select {
case e := <-watcher.ResultChan():
pod := e.Object.(*api.Pod)
podRV, err := storage.ParseWatchResourceVersion(pod.ResourceVersion)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

// event should have at least rv + 1, since we're starting the watch at rv
if podRV <= rv {
t.Errorf("expected event with resourceVersion of at least %d, got %d", rv+1, podRV)
}
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("timed out waiting for event")
}
}
9 changes: 5 additions & 4 deletions pkg/storage/watch_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ const (
// the previous value of the object to enable proper filtering in the
// upper layers.
type watchCacheEvent struct {
Type watch.EventType
Object runtime.Object
PrevObject runtime.Object
Type watch.EventType
Object runtime.Object
PrevObject runtime.Object
ResourceVersion uint64
}

// watchCacheElement is a single "watch event" stored in a cache.
Expand Down Expand Up @@ -179,7 +180,7 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
if exists {
prevObject = previous.(runtime.Object)
}
watchCacheEvent := watchCacheEvent{event.Type, event.Object, prevObject}
watchCacheEvent := watchCacheEvent{event.Type, event.Object, prevObject, resourceVersion}
if w.onEvent != nil {
w.onEvent(watchCacheEvent)
}
Expand Down

0 comments on commit d800dca

Please sign in to comment.