diff --git a/internal/framework/status/k8s_updater.go b/internal/framework/status/k8s_updater.go new file mode 100644 index 000000000..e249c6b02 --- /dev/null +++ b/internal/framework/status/k8s_updater.go @@ -0,0 +1,16 @@ +package status + +import ( + "context" + + "sigs.k8s.io/controller-runtime/pkg/client" +) + +//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . K8sUpdater + +// K8sUpdater updates a resource from the k8s API. +// It allows us to mock the client.Reader.Status.Update method. +type K8sUpdater interface { + // Update is from client.StatusClient.SubResourceWriter. + Update(ctx context.Context, obj client.Object, opts ...client.SubResourceUpdateOption) error +} diff --git a/internal/framework/status/statusfakes/fake_k8s_updater.go b/internal/framework/status/statusfakes/fake_k8s_updater.go new file mode 100644 index 000000000..8208947af --- /dev/null +++ b/internal/framework/status/statusfakes/fake_k8s_updater.go @@ -0,0 +1,117 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package statusfakes + +import ( + "context" + "sync" + + "github.com/nginxinc/nginx-gateway-fabric/internal/framework/status" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type FakeK8sUpdater struct { + UpdateStub func(context.Context, client.Object, ...client.SubResourceUpdateOption) error + updateMutex sync.RWMutex + updateArgsForCall []struct { + arg1 context.Context + arg2 client.Object + arg3 []client.SubResourceUpdateOption + } + updateReturns struct { + result1 error + } + updateReturnsOnCall map[int]struct { + result1 error + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *FakeK8sUpdater) Update(arg1 context.Context, arg2 client.Object, arg3 ...client.SubResourceUpdateOption) error { + fake.updateMutex.Lock() + ret, specificReturn := fake.updateReturnsOnCall[len(fake.updateArgsForCall)] + fake.updateArgsForCall = append(fake.updateArgsForCall, struct { + arg1 context.Context + arg2 client.Object + arg3 []client.SubResourceUpdateOption + }{arg1, arg2, arg3}) + stub := fake.UpdateStub + fakeReturns := fake.updateReturns + fake.recordInvocation("Update", []interface{}{arg1, arg2, arg3}) + fake.updateMutex.Unlock() + if stub != nil { + return stub(arg1, arg2, arg3...) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeK8sUpdater) UpdateCallCount() int { + fake.updateMutex.RLock() + defer fake.updateMutex.RUnlock() + return len(fake.updateArgsForCall) +} + +func (fake *FakeK8sUpdater) UpdateCalls(stub func(context.Context, client.Object, ...client.SubResourceUpdateOption) error) { + fake.updateMutex.Lock() + defer fake.updateMutex.Unlock() + fake.UpdateStub = stub +} + +func (fake *FakeK8sUpdater) UpdateArgsForCall(i int) (context.Context, client.Object, []client.SubResourceUpdateOption) { + fake.updateMutex.RLock() + defer fake.updateMutex.RUnlock() + argsForCall := fake.updateArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 +} + +func (fake *FakeK8sUpdater) UpdateReturns(result1 error) { + fake.updateMutex.Lock() + defer fake.updateMutex.Unlock() + fake.UpdateStub = nil + fake.updateReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeK8sUpdater) UpdateReturnsOnCall(i int, result1 error) { + fake.updateMutex.Lock() + defer fake.updateMutex.Unlock() + fake.UpdateStub = nil + if fake.updateReturnsOnCall == nil { + fake.updateReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.updateReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeK8sUpdater) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.updateMutex.RLock() + defer fake.updateMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *FakeK8sUpdater) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ status.K8sUpdater = new(FakeK8sUpdater) diff --git a/internal/framework/status/updater.go b/internal/framework/status/updater.go index 7ed2c08bd..d14bebfa4 100644 --- a/internal/framework/status/updater.go +++ b/internal/framework/status/updater.go @@ -2,16 +2,20 @@ package status import ( "context" + "errors" "fmt" "sync" + "time" "github.com/go-logr/logr" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/gateway-api/apis/v1beta1" ngfAPI "github.com/nginxinc/nginx-gateway-fabric/apis/v1alpha1" + "github.com/nginxinc/nginx-gateway-fabric/internal/framework/controller" ) //go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . Updater @@ -64,15 +68,11 @@ type UpdaterConfig struct { // (b) k8s API can become slow or even timeout. This will increase every update status API call. // Making UpdaterImpl asynchronous will prevent it from adding variable delays to the event loop. // -// (3) It doesn't retry on failures. This means there is a chance that some resources will not have up-to-do statuses. -// Statuses are important part of the Gateway API, so we need to ensure that the Gateway always keep the resources -// statuses up-to-date. -// -// (4) It doesn't clear the statuses of a resources that are no longer handled by the Gateway. For example, if +// (3) It doesn't clear the statuses of a resources that are no longer handled by the Gateway. For example, if // an HTTPRoute resource no longer has the parentRef to the Gateway resources, the Gateway must update the status // of the resource to remove the status about the removed parentRef. // -// (5) If another controllers changes the status of the Gateway/HTTPRoute resource so that the information set by our +// (4) If another controllers changes the status of the Gateway/HTTPRoute resource so that the information set by our // Gateway is removed, our Gateway will not restore the status until the EventLoop invokes the StatusUpdater as a // result of processing some other new change to a resource(s). // FIXME(pleshakov): Make updater production ready @@ -179,6 +179,11 @@ func (upd *UpdaterImpl) updateGatewayAPI(ctx context.Context, statuses GatewayAP if upd.cfg.UpdateGatewayClassStatus { for nsname, gcs := range statuses.GatewayClassStatuses { + select { + case <-ctx.Done(): + return + default: + } upd.writeStatuses(ctx, nsname, &v1beta1.GatewayClass{}, func(object client.Object) { gc := object.(*v1beta1.GatewayClass) gc.Status = prepareGatewayClassStatus(gcs, upd.cfg.Clock.Now()) @@ -188,6 +193,11 @@ func (upd *UpdaterImpl) updateGatewayAPI(ctx context.Context, statuses GatewayAP } for nsname, gs := range statuses.GatewayStatuses { + select { + case <-ctx.Done(): + return + default: + } upd.writeStatuses(ctx, nsname, &v1beta1.Gateway{}, func(object client.Object) { gw := object.(*v1beta1.Gateway) gw.Status = prepareGatewayStatus(gs, upd.cfg.PodIP, upd.cfg.Clock.Now()) @@ -200,7 +210,6 @@ func (upd *UpdaterImpl) updateGatewayAPI(ctx context.Context, statuses GatewayAP return default: } - upd.writeStatuses(ctx, nsname, &v1beta1.HTTPRoute{}, func(object client.Object) { hr := object.(*v1beta1.HTTPRoute) // statuses.GatewayStatus is never nil when len(statuses.HTTPRouteStatuses) > 0 @@ -219,26 +228,19 @@ func (upd *UpdaterImpl) writeStatuses( obj client.Object, statusSetter func(client.Object), ) { - // The function handles errors by reporting them in the logs. - // We need to get the latest version of the resource. - // Otherwise, the Update status API call can fail. - // Note: the default client uses a cache for reads, so we're not making an unnecessary API call here. - // the default is configurable in the Manager options. - if err := upd.cfg.Client.Get(ctx, nsname, obj); err != nil { - if !apierrors.IsNotFound(err) { - upd.cfg.Logger.Error( - err, - "Failed to get the recent version the resource when updating status", - "namespace", nsname.Namespace, - "name", nsname.Name, - "kind", obj.GetObjectKind().GroupVersionKind().Kind) - } - return - } - - statusSetter(obj) - - if err := upd.cfg.Client.Status().Update(ctx, obj); err != nil { + err := wait.ExponentialBackoffWithContext( + ctx, + wait.Backoff{ + Duration: time.Millisecond * 200, + Factor: 2, + Jitter: 0.5, + Steps: 4, + Cap: time.Millisecond * 3000, + }, + // Function returns true if the condition is satisfied, or an error if the loop should be aborted. + NewRetryUpdateFunc(upd.cfg.Client, upd.cfg.Client.Status(), nsname, obj, upd.cfg.Logger, statusSetter), + ) + if err != nil && !errors.Is(err, context.Canceled) { upd.cfg.Logger.Error( err, "Failed to update status", @@ -247,3 +249,57 @@ func (upd *UpdaterImpl) writeStatuses( "kind", obj.GetObjectKind().GroupVersionKind().Kind) } } + +// NewRetryUpdateFunc returns a function which will be used in wait.ExponentialBackoffWithContext. +// The function will attempt to Update a kubernetes resource and will be retried in +// wait.ExponentialBackoffWithContext if an error occurs. Exported for testing purposes. +// +// wait.ExponentialBackoffWithContext will retry if this function returns nil as its error, +// which is what we want if we encounter an error from the functions we call. However, +// the linter will complain if we return nil if an error was found. +// +//nolint:nilerr +func NewRetryUpdateFunc( + getter controller.Getter, + updater K8sUpdater, + nsname types.NamespacedName, + obj client.Object, + logger logr.Logger, + statusSetter func(client.Object), +) func(ctx context.Context) (bool, error) { + return func(ctx context.Context) (bool, error) { + // The function handles errors by reporting them in the logs. + // We need to get the latest version of the resource. + // Otherwise, the Update status API call can fail. + // Note: the default client uses a cache for reads, so we're not making an unnecessary API call here. + // the default is configurable in the Manager options. + if err := getter.Get(ctx, nsname, obj); err != nil { + // apierrors.IsNotFound(err) can happen when the resource is deleted, + // so no need to retry or return an error. + if apierrors.IsNotFound(err) { + return true, nil + } + logger.V(1).Info( + "Encountered error when getting resource to update status", + "error", err, + "namespace", nsname.Namespace, + "name", nsname.Name, + "kind", obj.GetObjectKind().GroupVersionKind().Kind) + return false, nil + } + + statusSetter(obj) + + if err := updater.Update(ctx, obj); err != nil { + logger.V(1).Info( + "Encountered error updating status", + "error", err, + "namespace", nsname.Namespace, + "name", nsname.Name, + "kind", obj.GetObjectKind().GroupVersionKind().Kind) + return false, nil + } + + return true, nil + } +} diff --git a/internal/framework/status/updater_retry_test.go b/internal/framework/status/updater_retry_test.go new file mode 100644 index 000000000..db4a23b6b --- /dev/null +++ b/internal/framework/status/updater_retry_test.go @@ -0,0 +1,75 @@ +package status_test + +import ( + "context" + "errors" + "testing" + + . "github.com/onsi/gomega" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + "sigs.k8s.io/gateway-api/apis/v1beta1" + + "github.com/nginxinc/nginx-gateway-fabric/internal/framework/controller/controllerfakes" + "github.com/nginxinc/nginx-gateway-fabric/internal/framework/status" + "github.com/nginxinc/nginx-gateway-fabric/internal/framework/status/statusfakes" +) + +func TestNewRetryUpdateFunc(t *testing.T) { + tests := []struct { + getReturns error + updateReturns error + name string + expConditionPassed bool + }{ + { + getReturns: errors.New("failed to get resource"), + updateReturns: nil, + name: "get fails", + expConditionPassed: false, + }, + { + getReturns: apierrors.NewNotFound(schema.GroupResource{}, "not found"), + updateReturns: nil, + name: "get fails and apierrors is not found", + expConditionPassed: true, + }, + { + getReturns: nil, + updateReturns: errors.New("failed to update resource"), + name: "update fails", + expConditionPassed: false, + }, + { + getReturns: nil, + updateReturns: nil, + name: "nothing fails", + expConditionPassed: true, + }, + } + + fakeStatusUpdater := &statusfakes.FakeK8sUpdater{} + fakeGetter := &controllerfakes.FakeGetter{} + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + g := NewWithT(t) + fakeStatusUpdater.UpdateReturns(test.updateReturns) + fakeGetter.GetReturns(test.getReturns) + f := status.NewRetryUpdateFunc( + fakeGetter, + fakeStatusUpdater, + types.NamespacedName{}, + &v1beta1.GatewayClass{}, + zap.New(), + func(client.Object) {}) + conditionPassed, err := f(context.Background()) + + // The function should always return nil. + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(conditionPassed).To(Equal(test.expConditionPassed)) + }) + } +} diff --git a/internal/framework/status/updater_test.go b/internal/framework/status/updater_test.go index 05b6af4ba..30b8d4127 100644 --- a/internal/framework/status/updater_test.go +++ b/internal/framework/status/updater_test.go @@ -331,7 +331,7 @@ var _ = Describe("Updater", func() { expectedGc := createExpectedGCWithGeneration(1) err := client.Get(context.Background(), types.NamespacedName{Name: gcName}, latestGc) - Expect(err).Should(Not(HaveOccurred())) + Expect(err).ToNot(HaveOccurred()) expectedGc.ResourceVersion = latestGc.ResourceVersion // updating the status changes the ResourceVersion @@ -343,7 +343,7 @@ var _ = Describe("Updater", func() { expectedGw := createExpectedGwWithGeneration(1) err := client.Get(context.Background(), types.NamespacedName{Namespace: "test", Name: "gateway"}, latestGw) - Expect(err).Should(Not(HaveOccurred())) + Expect(err).ToNot(HaveOccurred()) expectedGw.ResourceVersion = latestGw.ResourceVersion @@ -359,7 +359,7 @@ var _ = Describe("Updater", func() { types.NamespacedName{Namespace: "test", Name: "ignored-gateway"}, latestGw, ) - Expect(err).Should(Not(HaveOccurred())) + Expect(err).ToNot(HaveOccurred()) expectedGw.ResourceVersion = latestGw.ResourceVersion @@ -371,7 +371,7 @@ var _ = Describe("Updater", func() { expectedHR := createExpectedHR() err := client.Get(context.Background(), types.NamespacedName{Namespace: "test", Name: "route1"}, latestHR) - Expect(err).Should(Not(HaveOccurred())) + Expect(err).ToNot(HaveOccurred()) expectedHR.ResourceVersion = latestHR.ResourceVersion @@ -391,14 +391,14 @@ var _ = Describe("Updater", func() { types.NamespacedName{Namespace: "nginx-gateway", Name: "nginx-gateway-config"}, latestNG, ) - Expect(err).Should(Not(HaveOccurred())) + Expect(err).ToNot(HaveOccurred()) expectedNG.ResourceVersion = latestNG.ResourceVersion Expect(helpers.Diff(expectedNG, latestNG)).To(BeEmpty()) }) - It("should update statuses with canceled context - function normally returns", func() { + It("should not update Gateway API statuses with canceled context - function normally returns", func() { ctx, cancel := context.WithCancel(context.Background()) cancel() updater.Update(ctx, createGwAPIStatuses(generations{ @@ -408,28 +408,28 @@ var _ = Describe("Updater", func() { }) When("updating with canceled context", func() { - It("should have the updated status of GatewayClass in the API server", func() { + It("should not have the updated status of GatewayClass in the API server", func() { latestGc := &v1beta1.GatewayClass{} - expectedGc := createExpectedGCWithGeneration(2) + expectedGc := createExpectedGCWithGeneration(1) err := client.Get(context.Background(), types.NamespacedName{Name: gcName}, latestGc) - Expect(err).Should(Not(HaveOccurred())) + Expect(err).ToNot(HaveOccurred()) expectedGc.ResourceVersion = latestGc.ResourceVersion Expect(helpers.Diff(expectedGc, latestGc)).To(BeEmpty()) }) - It("should have the updated status of Gateway in the API server", func() { + It("should not have the updated status of Gateway in the API server", func() { latestGw := &v1beta1.Gateway{} - expectedGw := createExpectedGwWithGeneration(2) + expectedGw := createExpectedGwWithGeneration(1) err := client.Get( context.Background(), types.NamespacedName{Namespace: "test", Name: "gateway"}, latestGw, ) - Expect(err).Should(Not(HaveOccurred())) + Expect(err).ToNot(HaveOccurred()) expectedGw.ResourceVersion = latestGw.ResourceVersion @@ -445,7 +445,7 @@ var _ = Describe("Updater", func() { types.NamespacedName{Namespace: "test", Name: "ignored-gateway"}, latestGw, ) - Expect(err).Should(Not(HaveOccurred())) + Expect(err).ToNot(HaveOccurred()) expectedGw.ResourceVersion = latestGw.ResourceVersion @@ -462,7 +462,7 @@ var _ = Describe("Updater", func() { types.NamespacedName{Namespace: "test", Name: "route1"}, latestHR, ) - Expect(err).Should(Not(HaveOccurred())) + Expect(err).ToNot(HaveOccurred()) expectedHR.ResourceVersion = latestHR.ResourceVersion @@ -470,6 +470,31 @@ var _ = Describe("Updater", func() { Expect(helpers.Diff(expectedHR, latestHR)).To(BeEmpty()) }) }) + + It("should not update NginxGateway status with canceled context - function normally returns", func() { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + updater.Update(ctx, createNGStatus(2)) + }) + + When("updating with canceled context", func() { + It("should not have the updated status of the NginxGateway in the API server", func() { + latestNG := &ngfAPI.NginxGateway{} + expectedNG := createExpectedNGWithGeneration(1) + + err := client.Get( + context.Background(), + types.NamespacedName{Namespace: "nginx-gateway", Name: "nginx-gateway-config"}, + latestNG, + ) + Expect(err).ToNot(HaveOccurred()) + + expectedNG.ResourceVersion = latestNG.ResourceVersion + + Expect(helpers.Diff(expectedNG, latestNG)).To(BeEmpty()) + }) + }) + When("the Pod is not the current leader", func() { It("should not update any statuses", func() { updater.Disable() @@ -481,15 +506,15 @@ var _ = Describe("Updater", func() { It("should not have the updated status of Gateway in the API server", func() { latestGw := &v1beta1.Gateway{} - // testing that the generation has not changed from 2 to 3 - expectedGw := createExpectedGwWithGeneration(2) + // testing that the generation has not changed from 1 to 3 + expectedGw := createExpectedGwWithGeneration(1) err := client.Get( context.Background(), types.NamespacedName{Namespace: "test", Name: "gateway"}, latestGw, ) - Expect(err).Should(Not(HaveOccurred())) + Expect(err).ToNot(HaveOccurred()) expectedGw.ResourceVersion = latestGw.ResourceVersion @@ -505,7 +530,7 @@ var _ = Describe("Updater", func() { types.NamespacedName{Namespace: "nginx-gateway", Name: "nginx-gateway-config"}, latestNG, ) - Expect(err).Should(Not(HaveOccurred())) + Expect(err).ToNot(HaveOccurred()) expectedNG.ResourceVersion = latestNG.ResourceVersion @@ -526,7 +551,7 @@ var _ = Describe("Updater", func() { types.NamespacedName{Namespace: "test", Name: "gateway"}, latestGw, ) - Expect(err).Should(Not(HaveOccurred())) + Expect(err).ToNot(HaveOccurred()) expectedGw.ResourceVersion = latestGw.ResourceVersion @@ -542,7 +567,7 @@ var _ = Describe("Updater", func() { types.NamespacedName{Namespace: "nginx-gateway", Name: "nginx-gateway-config"}, latestNG, ) - Expect(err).Should(Not(HaveOccurred())) + Expect(err).ToNot(HaveOccurred()) expectedNG.ResourceVersion = latestNG.ResourceVersion @@ -566,7 +591,7 @@ var _ = Describe("Updater", func() { types.NamespacedName{Namespace: "test", Name: "gateway"}, latestGw, ) - Expect(err).Should(Not(HaveOccurred())) + Expect(err).ToNot(HaveOccurred()) expectedGw.ResourceVersion = latestGw.ResourceVersion @@ -585,7 +610,7 @@ var _ = Describe("Updater", func() { types.NamespacedName{Namespace: "nginx-gateway", Name: "nginx-gateway-config"}, latestNG, ) - Expect(err).Should(Not(HaveOccurred())) + Expect(err).ToNot(HaveOccurred()) expectedNG.ResourceVersion = latestNG.ResourceVersion @@ -619,11 +644,11 @@ var _ = Describe("Updater", func() { types.NamespacedName{Namespace: "test", Name: "gateway"}, latestGw, ) - Expect(err).Should(Not(HaveOccurred())) + Expect(err).ToNot(HaveOccurred()) - // Before this test there were 5 updates to the Gateway resource. - // So now the resource version should equal 25. - Expect(latestGw.ResourceVersion).To(Equal("25")) + // Before this test there were 4 updates to the Gateway resource. + // So now the resource version should equal 24. + Expect(latestGw.ResourceVersion).To(Equal("24")) }) }) }) @@ -676,7 +701,7 @@ var _ = Describe("Updater", func() { latestGc := &v1beta1.GatewayClass{} err := client.Get(context.Background(), types.NamespacedName{Name: gcName}, latestGc) - Expect(err).Should(Not(HaveOccurred())) + Expect(err).ToNot(HaveOccurred()) Expect(latestGc.Status).To(BeZero()) })