Skip to content

Commit

Permalink
Merge pull request projectcalico#1183 from lmm/lmm-refactor-errorthre…
Browse files Browse the repository at this point in the history
…shold

Refactor errorThreshold to fix data race
  • Loading branch information
lmm authored Dec 14, 2019
2 parents 43756ea + 20d212d commit 2449a6f
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 35 deletions.
24 changes: 13 additions & 11 deletions lib/backend/watchersyncer/watchercache.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,13 @@ type watcherCache struct {
errors int
resourceType ResourceType
currentWatchRevision string
errorThreshold int
}

var (
ListRetryInterval = 1000 * time.Millisecond
WatchPollInterval = 5000 * time.Millisecond
ErrorThreshold = 15
ListRetryInterval = 1000 * time.Millisecond
WatchPollInterval = 5000 * time.Millisecond
DefaultErrorThreshold = 15
)

// cacheEntry is an entry in our cache. It groups the a key with the last known
Expand All @@ -65,11 +66,12 @@ type cacheEntry struct {
// Create a new watcherCache.
func newWatcherCache(client api.Client, resourceType ResourceType, results chan<- interface{}) *watcherCache {
return &watcherCache{
logger: logrus.WithField("ListRoot", model.ListOptionsToDefaultPathRoot(resourceType.ListInterface)),
client: client,
resourceType: resourceType,
results: results,
resources: make(map[string]cacheEntry, 0),
logger: logrus.WithField("ListRoot", model.ListOptionsToDefaultPathRoot(resourceType.ListInterface)),
client: client,
resourceType: resourceType,
results: results,
resources: make(map[string]cacheEntry, 0),
errorThreshold: DefaultErrorThreshold,
}
}

Expand Down Expand Up @@ -134,7 +136,7 @@ mainLoop:
wc.onError()
}

if wc.errors > ErrorThreshold {
if wc.errors > wc.errorThreshold {
// Trigger a full resync if we're past the error threshold.
wc.currentWatchRevision = ""
wc.resyncAndCreateWatcher(ctx)
Expand Down Expand Up @@ -428,8 +430,8 @@ func (wc *watcherCache) markAsValid(resourceKey string) {
// exceeds the error threshold. See finishResync() for how the watcherCache goes back to in-sync.
func (wc *watcherCache) onError() {
wc.errors++
if wc.hasSynced && wc.errors > ErrorThreshold {
wc.logger.WithFields(logrus.Fields{"errors": wc.errors, "threshold": ErrorThreshold}).Debugf("Exceeded error threshold")
if wc.hasSynced && wc.errors > wc.errorThreshold {
wc.logger.WithFields(logrus.Fields{"errors": wc.errors, "threshold": wc.errorThreshold}).Debugf("Exceeded error threshold")
wc.hasSynced = false
wc.results <- api.WaitForDatastore
}
Expand Down
41 changes: 17 additions & 24 deletions lib/backend/watchersyncer/watchersyncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,8 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() {
It("should return WaitForDatastore on multiple consecutive watch errors", func() {
By("Sending errors to trigger a WaitForDatastore status update")

defaultThreshold := watchersyncer.ErrorThreshold
watchersyncer.ErrorThreshold = 6
defer func() {
watchersyncer.ErrorThreshold = defaultThreshold
}()
defer setErrorThreshold(watchersyncer.DefaultErrorThreshold)
setErrorThreshold(6)

rs := newWatcherSyncerTester([]watchersyncer.ResourceType{r1})
rs.ExpectStatusUpdate(api.WaitForDatastore)
Expand All @@ -133,7 +130,7 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() {

rs.clientWatchResponse(r1, genError)
rs.ExpectStatusUnchanged()
for i := 0; i < watchersyncer.ErrorThreshold-1; i++ {
for i := 0; i < watchersyncer.DefaultErrorThreshold-1; i++ {
rs.clientListResponse(r1, genError)
rs.ExpectStatusUnchanged()
}
Expand All @@ -154,7 +151,7 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() {
rs.clientWatchResponse(r1, nil)

// Watch is set but is now returning a generic WatchErrors
for i := 0; i < watchersyncer.ErrorThreshold; i++ {
for i := 0; i < watchersyncer.DefaultErrorThreshold; i++ {
rs.sendEvent(r1, genWatchError)
rs.ExpectStatusUnchanged()
}
Expand All @@ -164,6 +161,9 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() {
})

It("should handle reconnection if watchers fail to be created", func() {
defer setErrorThreshold(watchersyncer.DefaultErrorThreshold)
setErrorThreshold(3)

rs := newWatcherSyncerTester([]watchersyncer.ResourceType{r1, r2, r3})
rs.ExpectStatusUpdate(api.WaitForDatastore)

Expand All @@ -173,12 +173,6 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() {
defer setWatchIntervals(watchersyncer.ListRetryInterval, watchersyncer.WatchPollInterval)
setWatchIntervals(500*time.Millisecond, 2000*time.Millisecond)

defaultThreshold := watchersyncer.ErrorThreshold
watchersyncer.ErrorThreshold = 3
defer func() {
watchersyncer.ErrorThreshold = defaultThreshold
}()

// All of the events should have been consumed within a time frame dictated by the
// list retry and poll timers.
//
Expand Down Expand Up @@ -208,7 +202,7 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() {
rs.ExpectStatusUpdate(api.ResyncInProgress)
rs.clientWatchResponse(r1, genError)

for i := 0; i < watchersyncer.ErrorThreshold; i++ {
for i := 0; i < watchersyncer.DefaultErrorThreshold; i++ {
rs.clientListResponse(r1, genError)
}

Expand Down Expand Up @@ -272,11 +266,9 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() {
})

It("Should handle reconnection and syncing when the watcher sends a watch terminated error", func() {
defaultThreshold := watchersyncer.ErrorThreshold
watchersyncer.ErrorThreshold = 0
defer func() {
watchersyncer.ErrorThreshold = defaultThreshold
}()

defer setErrorThreshold(watchersyncer.DefaultErrorThreshold)
setErrorThreshold(0)

rs := newWatcherSyncerTester([]watchersyncer.ResourceType{r1, r2, r3})
rs.ExpectStatusUpdate(api.WaitForDatastore)
Expand Down Expand Up @@ -305,11 +297,8 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() {
})

It("Should not return WaitForDatastore on multiple watch errors due to ClosedByRemote exceeding error threshold", func() {
defaultThreshold := watchersyncer.ErrorThreshold
watchersyncer.ErrorThreshold = 0
defer func() {
watchersyncer.ErrorThreshold = defaultThreshold
}()
defer setErrorThreshold(watchersyncer.DefaultErrorThreshold)
setErrorThreshold(0)

rs := newWatcherSyncerTester([]watchersyncer.ResourceType{r1})
rs.ExpectStatusUpdate(api.WaitForDatastore)
Expand Down Expand Up @@ -819,6 +808,10 @@ func setWatchIntervals(listRetryInterval, watchPollInterval time.Duration) {
watchersyncer.WatchPollInterval = watchPollInterval
}

func setErrorThreshold(t int) {
watchersyncer.DefaultErrorThreshold = t
}

// Fake converter used to cover error and update handling paths.
type fakeConverter struct {
i int
Expand Down

0 comments on commit 2449a6f

Please sign in to comment.