Skip to content

Commit

Permalink
Implement Status Updater Retrying on Failures (#1062)
Browse files Browse the repository at this point in the history
Implement retries on status update failure.

Problem: NGF will not retry on status update failure, thus there is a chance that some resources will not have up-to-do statuses.

Solution: Add retry logic when status update fails with a small exponential backoff after each retry. Also, added logic to allow for a graceful exit of the status updater when the NGF pod context is cancelled.
  • Loading branch information
bjee19 authored Sep 27, 2023
1 parent 8e57fe8 commit 8e8a2bf
Show file tree
Hide file tree
Showing 5 changed files with 343 additions and 54 deletions.
16 changes: 16 additions & 0 deletions internal/framework/status/k8s_updater.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package status

import (
"context"

"sigs.k8s.io/controller-runtime/pkg/client"
)

//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . K8sUpdater

// K8sUpdater updates a resource from the k8s API.
// It allows us to mock the client.Reader.Status.Update method.
type K8sUpdater interface {
// Update is from client.StatusClient.SubResourceWriter.
Update(ctx context.Context, obj client.Object, opts ...client.SubResourceUpdateOption) error
}
117 changes: 117 additions & 0 deletions internal/framework/status/statusfakes/fake_k8s_updater.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

110 changes: 83 additions & 27 deletions internal/framework/status/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@ package status

import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/go-logr/logr"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/gateway-api/apis/v1beta1"

ngfAPI "github.com/nginxinc/nginx-gateway-fabric/apis/v1alpha1"
"github.com/nginxinc/nginx-gateway-fabric/internal/framework/controller"
)

//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . Updater
Expand Down Expand Up @@ -64,15 +68,11 @@ type UpdaterConfig struct {
// (b) k8s API can become slow or even timeout. This will increase every update status API call.
// Making UpdaterImpl asynchronous will prevent it from adding variable delays to the event loop.
//
// (3) It doesn't retry on failures. This means there is a chance that some resources will not have up-to-do statuses.
// Statuses are important part of the Gateway API, so we need to ensure that the Gateway always keep the resources
// statuses up-to-date.
//
// (4) It doesn't clear the statuses of a resources that are no longer handled by the Gateway. For example, if
// (3) It doesn't clear the statuses of a resources that are no longer handled by the Gateway. For example, if
// an HTTPRoute resource no longer has the parentRef to the Gateway resources, the Gateway must update the status
// of the resource to remove the status about the removed parentRef.
//
// (5) If another controllers changes the status of the Gateway/HTTPRoute resource so that the information set by our
// (4) If another controllers changes the status of the Gateway/HTTPRoute resource so that the information set by our
// Gateway is removed, our Gateway will not restore the status until the EventLoop invokes the StatusUpdater as a
// result of processing some other new change to a resource(s).
// FIXME(pleshakov): Make updater production ready
Expand Down Expand Up @@ -179,6 +179,11 @@ func (upd *UpdaterImpl) updateGatewayAPI(ctx context.Context, statuses GatewayAP

if upd.cfg.UpdateGatewayClassStatus {
for nsname, gcs := range statuses.GatewayClassStatuses {
select {
case <-ctx.Done():
return
default:
}
upd.writeStatuses(ctx, nsname, &v1beta1.GatewayClass{}, func(object client.Object) {
gc := object.(*v1beta1.GatewayClass)
gc.Status = prepareGatewayClassStatus(gcs, upd.cfg.Clock.Now())
Expand All @@ -188,6 +193,11 @@ func (upd *UpdaterImpl) updateGatewayAPI(ctx context.Context, statuses GatewayAP
}

for nsname, gs := range statuses.GatewayStatuses {
select {
case <-ctx.Done():
return
default:
}
upd.writeStatuses(ctx, nsname, &v1beta1.Gateway{}, func(object client.Object) {
gw := object.(*v1beta1.Gateway)
gw.Status = prepareGatewayStatus(gs, upd.cfg.PodIP, upd.cfg.Clock.Now())
Expand All @@ -200,7 +210,6 @@ func (upd *UpdaterImpl) updateGatewayAPI(ctx context.Context, statuses GatewayAP
return
default:
}

upd.writeStatuses(ctx, nsname, &v1beta1.HTTPRoute{}, func(object client.Object) {
hr := object.(*v1beta1.HTTPRoute)
// statuses.GatewayStatus is never nil when len(statuses.HTTPRouteStatuses) > 0
Expand All @@ -219,26 +228,19 @@ func (upd *UpdaterImpl) writeStatuses(
obj client.Object,
statusSetter func(client.Object),
) {
// The function handles errors by reporting them in the logs.
// We need to get the latest version of the resource.
// Otherwise, the Update status API call can fail.
// Note: the default client uses a cache for reads, so we're not making an unnecessary API call here.
// the default is configurable in the Manager options.
if err := upd.cfg.Client.Get(ctx, nsname, obj); err != nil {
if !apierrors.IsNotFound(err) {
upd.cfg.Logger.Error(
err,
"Failed to get the recent version the resource when updating status",
"namespace", nsname.Namespace,
"name", nsname.Name,
"kind", obj.GetObjectKind().GroupVersionKind().Kind)
}
return
}

statusSetter(obj)

if err := upd.cfg.Client.Status().Update(ctx, obj); err != nil {
err := wait.ExponentialBackoffWithContext(
ctx,
wait.Backoff{
Duration: time.Millisecond * 200,
Factor: 2,
Jitter: 0.5,
Steps: 4,
Cap: time.Millisecond * 3000,
},
// Function returns true if the condition is satisfied, or an error if the loop should be aborted.
NewRetryUpdateFunc(upd.cfg.Client, upd.cfg.Client.Status(), nsname, obj, upd.cfg.Logger, statusSetter),
)
if err != nil && !errors.Is(err, context.Canceled) {
upd.cfg.Logger.Error(
err,
"Failed to update status",
Expand All @@ -247,3 +249,57 @@ func (upd *UpdaterImpl) writeStatuses(
"kind", obj.GetObjectKind().GroupVersionKind().Kind)
}
}

// NewRetryUpdateFunc returns a function which will be used in wait.ExponentialBackoffWithContext.
// The function will attempt to Update a kubernetes resource and will be retried in
// wait.ExponentialBackoffWithContext if an error occurs. Exported for testing purposes.
//
// wait.ExponentialBackoffWithContext will retry if this function returns nil as its error,
// which is what we want if we encounter an error from the functions we call. However,
// the linter will complain if we return nil if an error was found.
//
//nolint:nilerr
func NewRetryUpdateFunc(
getter controller.Getter,
updater K8sUpdater,
nsname types.NamespacedName,
obj client.Object,
logger logr.Logger,
statusSetter func(client.Object),
) func(ctx context.Context) (bool, error) {
return func(ctx context.Context) (bool, error) {
// The function handles errors by reporting them in the logs.
// We need to get the latest version of the resource.
// Otherwise, the Update status API call can fail.
// Note: the default client uses a cache for reads, so we're not making an unnecessary API call here.
// the default is configurable in the Manager options.
if err := getter.Get(ctx, nsname, obj); err != nil {
// apierrors.IsNotFound(err) can happen when the resource is deleted,
// so no need to retry or return an error.
if apierrors.IsNotFound(err) {
return true, nil
}
logger.V(1).Info(
"Encountered error when getting resource to update status",
"error", err,
"namespace", nsname.Namespace,
"name", nsname.Name,
"kind", obj.GetObjectKind().GroupVersionKind().Kind)
return false, nil
}

statusSetter(obj)

if err := updater.Update(ctx, obj); err != nil {
logger.V(1).Info(
"Encountered error updating status",
"error", err,
"namespace", nsname.Namespace,
"name", nsname.Name,
"kind", obj.GetObjectKind().GroupVersionKind().Kind)
return false, nil
}

return true, nil
}
}
75 changes: 75 additions & 0 deletions internal/framework/status/updater_retry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package status_test

import (
"context"
"errors"
"testing"

. "github.com/onsi/gomega"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/gateway-api/apis/v1beta1"

"github.com/nginxinc/nginx-gateway-fabric/internal/framework/controller/controllerfakes"
"github.com/nginxinc/nginx-gateway-fabric/internal/framework/status"
"github.com/nginxinc/nginx-gateway-fabric/internal/framework/status/statusfakes"
)

func TestNewRetryUpdateFunc(t *testing.T) {
tests := []struct {
getReturns error
updateReturns error
name string
expConditionPassed bool
}{
{
getReturns: errors.New("failed to get resource"),
updateReturns: nil,
name: "get fails",
expConditionPassed: false,
},
{
getReturns: apierrors.NewNotFound(schema.GroupResource{}, "not found"),
updateReturns: nil,
name: "get fails and apierrors is not found",
expConditionPassed: true,
},
{
getReturns: nil,
updateReturns: errors.New("failed to update resource"),
name: "update fails",
expConditionPassed: false,
},
{
getReturns: nil,
updateReturns: nil,
name: "nothing fails",
expConditionPassed: true,
},
}

fakeStatusUpdater := &statusfakes.FakeK8sUpdater{}
fakeGetter := &controllerfakes.FakeGetter{}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
g := NewWithT(t)
fakeStatusUpdater.UpdateReturns(test.updateReturns)
fakeGetter.GetReturns(test.getReturns)
f := status.NewRetryUpdateFunc(
fakeGetter,
fakeStatusUpdater,
types.NamespacedName{},
&v1beta1.GatewayClass{},
zap.New(),
func(client.Object) {})
conditionPassed, err := f(context.Background())

// The function should always return nil.
g.Expect(err).ToNot(HaveOccurred())
g.Expect(conditionPassed).To(Equal(test.expConditionPassed))
})
}
}
Loading

0 comments on commit 8e8a2bf

Please sign in to comment.