Skip to content

Commit

Permalink
fix: Handle watcher context cancel (#792)
Browse files Browse the repository at this point in the history
Update the remediator's filteredwatcher to handle context
cancellation, which is used to shut down the remediator.
Previously, the cancel error was treated as any other error and
the watcher would restart repeatedly until hitting the retry
timeout (~4m). Now it should exit more quickly.

This also avoids logging the confusing watch error,
`unable to decode an event from the watch stream`,
when the context is cancelled during normal execution.

b/281728670
  • Loading branch information
karlkfi authored Aug 8, 2023
1 parent 2b6a3bb commit 87a4ef9
Show file tree
Hide file tree
Showing 4 changed files with 336 additions and 96 deletions.
126 changes: 99 additions & 27 deletions pkg/remediator/watch/filteredwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package watch

import (
"context"
"errors"
"fmt"
"math"
"math/rand"
Expand Down Expand Up @@ -53,6 +54,15 @@ const (
// the watch timeout set by `ListOptions.TimeoutSeconds` in the watch
// create requests.
RESTConfigTimeout = time.Hour

// ClientWatchDecodingCause is the status cause returned for client errors during response decoding.
// https://github.com/kubernetes/client-go/blob/v0.26.7/rest/request.go#L785
ClientWatchDecodingCause metav1.CauseType = "ClientWatchDecoding"

// ContextCancelledCauseMessage is the error message from the DynamicClient
// when the Watch method errors due to context cancellation.
// https://github.com/kubernetes/apimachinery/blob/v0.26.7/pkg/watch/streamwatcher.go#L120
ContextCancelledCauseMessage = "unable to decode an event from the watch stream: context canceled"
)

// maxWatchRetryFactor is used to determine when the next retry should happen.
Expand Down Expand Up @@ -234,16 +244,54 @@ func waitUntilNextRetry(retries int) {
time.Sleep(duration)
}

// isContextCancelledStatusError returns true if the error is a *StatusError and
// one of the detail cause reasons is `ClientWatchDecoding` with the message
// `unable to decode an event from the watch stream: context canceled`.
// StatusError doesn't implement Is or Unwrap methods, and all http client
// errors are returned as decoding errors, so we have to test the cause message
// to detect context cancellation.
// This explicitly does not test for context timeout, because that's used for
// http client timeout, which we want to retry, and we don't currently have any
// timeout on the parent context.
func isContextCancelledStatusError(err error) bool {
var statusErr *apierrors.StatusError
if errors.As(err, &statusErr) {
if message, found := findStatusErrorCauseMessage(statusErr, ClientWatchDecodingCause); found {
if message == ContextCancelledCauseMessage {
return true
}
}
}
return false
}

// findStatusErrorCauseMessage returns the message and true, if the StatusError
// has a .detail.cause[] entry that matches the specified type, otherwise
// returns false.
func findStatusErrorCauseMessage(statusErr *apierrors.StatusError, causeType metav1.CauseType) (string, bool) {
if statusErr == nil || statusErr.ErrStatus.Details == nil {
return "", false
}
for _, cause := range statusErr.ErrStatus.Details.Causes {
if cause.Type == causeType {
return cause.Message, true
}
}
return "", false
}

// Run reads the event from the base watch interface,
// filters the event and pushes the object contained
// in the event to the controller work queue.
func (w *filteredWatcher) Run(ctx context.Context) status.Error {
klog.Infof("Watch started for %s", w.gvk)
var resourceVersion string
var retriesForWatchError int
var runErr status.Error

Watcher:
for {
// There are three ways this function can return:
// There are three ways start can return:
// 1. false, error -> We were unable to start the watch, so exit Run().
// 2. false, nil -> We have been stopped via Stop(), so exit Run().
// 3. true, nil -> We have not been stopped and we started a new watch.
Expand All @@ -258,39 +306,59 @@ func (w *filteredWatcher) Run(ctx context.Context) status.Error {
eventCount := 0
ignoredEventCount := 0
klog.V(2).Infof("(Re)starting watch for %s at resource version %q", w.gvk, resourceVersion)
for event := range w.base.ResultChan() {
w.pruneErrors()
newVersion, ignoreEvent, err := w.handle(ctx, event)
eventCount++
if ignoreEvent {
ignoredEventCount++
}
if err != nil {
if isExpiredError(err) {
klog.Infof("Watch for %s at resource version %q closed with: %v", w.gvk, resourceVersion, err)
// `w.handle` may fail because we try to watch an old resource version, setting
// a watch on an old resource version will always fail.
// Reset `resourceVersion` to an empty string here so that we can start a new
// watch at the most recent resource version.
resourceVersion = ""
} else if w.addError(watchEventErrorType + errorID(err)) {
klog.Errorf("Watch for %s at resource version %q ended with: %v", w.gvk, resourceVersion, err)
eventCh := w.base.ResultChan()
EventHandler:
for {
select {
case <-ctx.Done():
runErr = status.InternalWrapf(ctx.Err(), "remediator watch stopped for %s", w.gvk)
// Stop the watcher & return the status error
break Watcher
case event, ok := <-eventCh:
if !ok { // channel closed
// Restart the watcher
break EventHandler
}
w.pruneErrors()
newVersion, ignoreEvent, err := w.handle(ctx, event)
eventCount++
if ignoreEvent {
ignoredEventCount++
}
if err != nil {
if errors.Is(err, context.Canceled) || isContextCancelledStatusError(err) {
// The error wrappers are especially confusing for
// users, so just return context.Canceled.
runErr = status.InternalWrapf(context.Canceled, "remediator watch stopped for %s", w.gvk)
// Stop the watcher & return the status error
break Watcher
}
if isExpiredError(err) {
klog.Infof("Watch for %s at resource version %q closed with: %v", w.gvk, resourceVersion, err)
// `w.handle` may fail because we try to watch an old resource version, setting
// a watch on an old resource version will always fail.
// Reset `resourceVersion` to an empty string here so that we can start a new
// watch at the most recent resource version.
resourceVersion = ""
} else if w.addError(watchEventErrorType + errorID(err)) {
klog.Errorf("Watch for %s at resource version %q ended with: %v", w.gvk, resourceVersion, err)
}
retriesForWatchError++
waitUntilNextRetry(retriesForWatchError)
// Restart the watcher
break EventHandler
}
retriesForWatchError = 0
if newVersion != "" {
resourceVersion = newVersion
}
retriesForWatchError++
waitUntilNextRetry(retriesForWatchError)
// Call `break` to restart the watch.
break
}
retriesForWatchError = 0
if newVersion != "" {
resourceVersion = newVersion
}
}
klog.V(2).Infof("Ending watch for %s at resource version %q (total events: %d, ignored events: %d)",
w.gvk, resourceVersion, eventCount, ignoredEventCount)
}
klog.Infof("Watch stopped for %s", w.gvk)
return nil
return runErr
}

// start initiates a new base watch at the given resource version in a
Expand Down Expand Up @@ -318,6 +386,9 @@ func (w *filteredWatcher) start(ctx context.Context, resourceVersion string) (bo

base, err := w.startWatch(ctx, options)
if err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return false, status.InternalWrapf(err, "failed to start watch for %s", w.gvk)
}
return false, status.APIServerErrorf(err, "failed to start watch for %s", w.gvk)
}
w.base = base
Expand Down Expand Up @@ -351,6 +422,7 @@ func errorID(err error) string {
// and an error indicating that a watch.Error event type is encountered and the
// watch should be restarted.
func (w *filteredWatcher) handle(ctx context.Context, event watch.Event) (string, bool, error) {
klog.Infof("Handling watch event %v %v", event.Type, w.gvk)
var deleted bool
switch event.Type {
case watch.Added, watch.Modified:
Expand Down
Loading

0 comments on commit 87a4ef9

Please sign in to comment.