From 9b863786b5abc262987f3129b046a878511daec7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grzegorz=20Burzy=C5=84ski?= Date: Thu, 23 May 2024 18:45:42 +0200 Subject: [PATCH] feat: generate and push fallback config on update failure (#6071) Makes KongClient try to recover from regular config update failures by generating a fallback configuration, excluding affected objects reported back by gateways. --- .github/workflows/_integration_tests.yaml | 6 + CHANGELOG.md | 1 + .../multi-gw/debug/manager_debug.yaml | 2 +- internal/dataplane/fallback/fallback.go | 12 +- internal/dataplane/fallback/fallback_test.go | 3 +- internal/dataplane/kong_client.go | 154 ++++++-- internal/dataplane/kong_client_test.go | 370 +++++++++++++----- internal/manager/run.go | 3 + 8 files changed, 435 insertions(+), 116 deletions(-) diff --git a/.github/workflows/_integration_tests.yaml b/.github/workflows/_integration_tests.yaml index 4b49fedb97..368cbd2d3b 100644 --- a/.github/workflows/_integration_tests.yaml +++ b/.github/workflows/_integration_tests.yaml @@ -118,6 +118,12 @@ jobs: # https://github.com/Kong/kubernetes-ingress-controller/issues/5127 is resolved. router-flavor: 'traditional' go_test_flags: -run=TestIngressRecoverFromInvalidPath + - name: postgres-fallback-config + test: postgres + feature_gates: "GatewayAlpha=true,FallbackConfiguration=true" + - name: dbless-fallback-config + test: dbless + feature_gates: "GatewayAlpha=true,FallbackConfiguration=true" # Experimental tests, in the future all integration tests will be migrated to them. # Set enterprise to 'true' to enable isolated integration test cases requiring enterprise features. - name: isolated-dbless diff --git a/CHANGELOG.md b/CHANGELOG.md index 98b6f2eb90..aa4f1c29f1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -141,6 +141,7 @@ Adding a new version? You'll need three changes: [#5993](https://github.com/Kong/kubernetes-ingress-controller/pull/5993) [#6010](https://github.com/Kong/kubernetes-ingress-controller/pull/6010) [#6047](https://github.com/Kong/kubernetes-ingress-controller/pull/6047) + [#6071](https://github.com/Kong/kubernetes-ingress-controller/pull/6071) - Add support for Kubernetes Gateway API v1.1: - add a flag `--enable-controller-gwapi-grpcroute` to control whether enable or disable GRPCRoute controller. diff --git a/config/variants/multi-gw/debug/manager_debug.yaml b/config/variants/multi-gw/debug/manager_debug.yaml index 0aa6c60c93..2448e70441 100644 --- a/config/variants/multi-gw/debug/manager_debug.yaml +++ b/config/variants/multi-gw/debug/manager_debug.yaml @@ -30,7 +30,7 @@ spec: - /manager-debug - -- args: - - --feature-gates=GatewayAlpha=true + - --feature-gates=GatewayAlpha=true,FallbackConfiguration=true - --anonymous-reports=false env: - name: CONTROLLER_LOG_LEVEL diff --git a/internal/dataplane/fallback/fallback.go b/internal/dataplane/fallback/fallback.go index 35f31b16bc..c3643aacff 100644 --- a/internal/dataplane/fallback/fallback.go +++ b/internal/dataplane/fallback/fallback.go @@ -3,7 +3,10 @@ package fallback import ( "fmt" + "github.com/go-logr/logr" + "github.com/kong/kubernetes-ingress-controller/v3/internal/store" + "github.com/kong/kubernetes-ingress-controller/v3/internal/util" ) type CacheGraphProvider interface { @@ -14,11 +17,13 @@ type CacheGraphProvider interface { // Generator is responsible for generating fallback cache snapshots. type Generator struct { cacheGraphProvider CacheGraphProvider + logger logr.Logger } -func NewGenerator(cacheGraphProvider CacheGraphProvider) *Generator { +func NewGenerator(cacheGraphProvider CacheGraphProvider, logger logr.Logger) *Generator { return &Generator{ cacheGraphProvider: cacheGraphProvider, + logger: logger.WithName("fallback-generator"), } } @@ -46,6 +51,11 @@ func (g *Generator) GenerateExcludingAffected( if err := fallbackCache.Delete(obj); err != nil { return store.CacheStores{}, fmt.Errorf("failed to delete %s from the cache: %w", GetObjectHash(obj), err) } + g.logger.V(util.DebugLevel).Info("Excluded object from fallback cache", + "object_kind", obj.GetObjectKind(), + "object_name", obj.GetName(), + "object_namespace", obj.GetNamespace(), + ) } } diff --git a/internal/dataplane/fallback/fallback_test.go b/internal/dataplane/fallback/fallback_test.go index 1fc05c58ca..59201e9c3e 100644 --- a/internal/dataplane/fallback/fallback_test.go +++ b/internal/dataplane/fallback/fallback_test.go @@ -3,6 +3,7 @@ package fallback_test import ( "testing" + "github.com/go-logr/logr" "github.com/stretchr/testify/require" "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/fallback" @@ -53,7 +54,7 @@ func TestGenerator_GenerateExcludingAffected(t *testing.T) { require.NoError(t, err) graphProvider := &mockGraphProvider{graph: graph} - g := fallback.NewGenerator(graphProvider) + g := fallback.NewGenerator(graphProvider, logr.Discard()) t.Run("ingressClass is broken", func(t *testing.T) { fallbackCache, err := g.GenerateExcludingAffected(inputCacheStores, []fallback.ObjectHash{fallback.GetObjectHash(ingressClass)}) diff --git a/internal/dataplane/kong_client.go b/internal/dataplane/kong_client.go index e9f415673f..0d738b1027 100644 --- a/internal/dataplane/kong_client.go +++ b/internal/dataplane/kong_client.go @@ -30,6 +30,7 @@ import ( "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/deckerrors" "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/deckgen" "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/failures" + "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/fallback" "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/kongstate" "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/sendconfig" "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/translator" @@ -59,6 +60,11 @@ type KongConfigBuilder interface { UpdateCache(store.CacheStores) } +// FallbackConfigGenerator generates a fallback configuration based on a cache snapshot and a set of broken objects. +type FallbackConfigGenerator interface { + GenerateExcludingAffected(store.CacheStores, []fallback.ObjectHash) (store.CacheStores, error) +} + // KongClient is a threadsafe high level API client for the Kong data-plane(s) // which parses Kubernetes object caches into Kong Admin configurations and // sends them as updates to the data-plane(s) (Kong Admin API). @@ -142,6 +148,9 @@ type KongClient struct { // currentConfigStatus is the current status of the configuration synchronisation. currentConfigStatus clients.ConfigStatus + + // fallbackConfigGenerator is used to generate a fallback configuration in case of sync failures. + fallbackConfigGenerator FallbackConfigGenerator } // NewKongClient provides a new KongClient object after connecting to the @@ -159,22 +168,24 @@ func NewKongClient( kongConfigFetcher configfetcher.LastValidConfigFetcher, kongConfigBuilder KongConfigBuilder, cacheStores store.CacheStores, + fallbackConfigGenerator FallbackConfigGenerator, ) (*KongClient, error) { c := &KongClient{ - logger: logger, - requestTimeout: timeout, - diagnostic: diagnostic, - prometheusMetrics: metrics.NewCtrlFuncMetrics(), - cache: &cacheStores, - kongConfig: kongConfig, - eventRecorder: eventRecorder, - dbmode: dbMode, - clientsProvider: clientsProvider, - configStatusNotifier: clients.NoOpConfigStatusNotifier{}, - updateStrategyResolver: updateStrategyResolver, - configChangeDetector: configChangeDetector, - kongConfigBuilder: kongConfigBuilder, - kongConfigFetcher: kongConfigFetcher, + logger: logger, + requestTimeout: timeout, + diagnostic: diagnostic, + prometheusMetrics: metrics.NewCtrlFuncMetrics(), + cache: &cacheStores, + kongConfig: kongConfig, + eventRecorder: eventRecorder, + dbmode: dbMode, + clientsProvider: clientsProvider, + configStatusNotifier: clients.NoOpConfigStatusNotifier{}, + updateStrategyResolver: updateStrategyResolver, + configChangeDetector: configChangeDetector, + kongConfigBuilder: kongConfigBuilder, + kongConfigFetcher: kongConfigFetcher, + fallbackConfigGenerator: fallbackConfigGenerator, } c.initializeControllerPodReference() @@ -400,8 +411,12 @@ func (c *KongClient) Update(ctx context.Context) error { // If FallbackConfiguration is enabled, we take a snapshot of the cache so that we operate on a consistent // set of resources in case of failures being returned from Kong. As we're going to generate a fallback config // based on the cache contents, we need to ensure it is not modified during the process. + var cacheSnapshot store.CacheStores if c.kongConfig.FallbackConfiguration { - cacheSnapshot, err := c.cache.TakeSnapshot() + // TODO: https://github.com/Kong/kubernetes-ingress-controller/issues/6080 + // Use TakeSnapshotIfChanged to avoid taking a snapshot if the cache hasn't changed. + var err error + cacheSnapshot, err = c.cache.TakeSnapshot() if err != nil { return fmt.Errorf("failed to take snapshot of cache: %w", err) } @@ -436,16 +451,11 @@ func (c *KongClient) Update(ctx context.Context) error { // In case of a failure in syncing configuration with Gateways, propagate the error. if gatewaysSyncErr != nil { - // TODO: https://github.com/Kong/kubernetes-ingress-controller/issues/5931 - // In case of a failure in syncing configuration with Gateways, if FallbackConfiguration is enabled, - // we should generate a fallback configuration and push it to the gateways first. - if state, found := c.kongConfigFetcher.LastValidConfig(); found { - _, fallbackSyncErr := c.sendOutToGatewayClients(ctx, state, c.kongConfig) - if fallbackSyncErr != nil { - return errors.Join(gatewaysSyncErr, fallbackSyncErr) - } - c.logger.V(util.DebugLevel).Info("Due to errors in the current config, the last valid config has been pushed to Gateways") + if recoveringErr := c.tryRecoveringFromGatewaysSyncError(ctx, cacheSnapshot, gatewaysSyncErr); recoveringErr != nil { + return fmt.Errorf("failed to recover from gateways sync error: %w", recoveringErr) } + // Update result is positive only if gateways were successfully synced with the current config, so we still + // need to return the error here even if we succeeded recovering. return gatewaysSyncErr } @@ -464,6 +474,100 @@ func (c *KongClient) Update(ctx context.Context) error { return nil } +// tryRecoveringFromGatewaysSyncError tries to recover from a configuration rejection by: +// 1. Generating a fallback configuration and pushing it to the gateways if FallbackConfiguration feature is enabled. +// 2. Applying the last valid configuration to the gateways if FallbackConfiguration is disabled or fallback +// configuration generation fails. +func (c *KongClient) tryRecoveringFromGatewaysSyncError( + ctx context.Context, + cacheSnapshot store.CacheStores, + gatewaysSyncErr error, +) error { + // If configuration was rejected by the gateways and FallbackConfiguration is enabled, + // we should generate a fallback configuration and push it to the gateways. + if c.kongConfig.FallbackConfiguration { + recoveringErr := c.tryRecoveringWithFallbackConfiguration(ctx, cacheSnapshot, gatewaysSyncErr) + if recoveringErr == nil { + c.logger.Info("Successfully recovered from configuration rejection with fallback configuration") + return nil + } + // If we failed to recover using the fallback configuration, we should log the error and carry on. + c.logger.Error(recoveringErr, "Failed to recover from configuration rejection with fallback configuration") + } + + // If FallbackConfiguration is disabled, or we failed to recover using the fallback configuration, we should + // apply the last valid configuration to the gateways. + if state, found := c.kongConfigFetcher.LastValidConfig(); found { + if _, fallbackSyncErr := c.sendOutToGatewayClients(ctx, state, c.kongConfig); fallbackSyncErr != nil { + return errors.Join(gatewaysSyncErr, fallbackSyncErr) + } + c.logger.V(util.DebugLevel).Info("Due to errors in the current config, the last valid config has been pushed to Gateways") + } + return nil +} + +// tryRecoveringWithFallbackConfiguration tries to recover from a configuration rejection by generating a fallback +// configuration excluding affected objects from the cache. +func (c *KongClient) tryRecoveringWithFallbackConfiguration( + ctx context.Context, + cacheSnapshot store.CacheStores, + gatewaysSyncErr error, +) error { + // Extract the broken objects from the update error and generate a fallback configuration excluding them. + brokenObjects, err := extractBrokenObjectsFromUpdateError(gatewaysSyncErr) + if err != nil { + return fmt.Errorf("failed to extract broken objects from update error: %w", err) + } + fallbackCache, err := c.fallbackConfigGenerator.GenerateExcludingAffected( + cacheSnapshot, + brokenObjects, + ) + if err != nil { + return fmt.Errorf("failed to generate fallback configuration: %w", err) + } + + // Update the KongConfigBuilder with the fallback configuration and build the KongConfig. + c.kongConfigBuilder.UpdateCache(fallbackCache) + fallbackParsingResult := c.kongConfigBuilder.BuildKongConfig() + + // TODO: https://github.com/Kong/kubernetes-ingress-controller/issues/6081 + // Emit Kubernetes events depending on fallback configuration parsing result. + + // TODO: https://github.com/Kong/kubernetes-ingress-controller/issues/6082 + // Expose Prometheus metrics for fallback configuration parsing result. + + _, gatewaysSyncErr = c.sendOutToGatewayClients(ctx, fallbackParsingResult.KongState, c.kongConfig) + if gatewaysSyncErr != nil { + return fmt.Errorf("failed to sync fallback configuration with gateways: %w", gatewaysSyncErr) + } + konnectSyncErr := c.maybeSendOutToKonnectClient(ctx, fallbackParsingResult.KongState, c.kongConfig) + if konnectSyncErr != nil { + // If Konnect sync fails, we should log the error and carry on as it's not a critical error. + c.logger.Error(konnectSyncErr, "Failed to sync fallback configuration with Konnect") + } + return nil +} + +// extractBrokenObjectsFromUpdateError. +func extractBrokenObjectsFromUpdateError(err error) ([]fallback.ObjectHash, error) { + var brokenObjects []client.Object + + var updateErr sendconfig.UpdateError + if ok := errors.As(err, &updateErr); !ok { + return nil, fmt.Errorf("expected UpdateError, cannot extract broken objects from %T", err) //nolint:errorlint + } + for _, resourceFailure := range updateErr.ResourceFailures() { + brokenObjects = append(brokenObjects, resourceFailure.CausingObjects()...) + } + if len(brokenObjects) == 0 { + return nil, fmt.Errorf("no broken objects found in UpdateError") + } + + return lo.Map(brokenObjects, func(obj client.Object, _ int) fallback.ObjectHash { + return fallback.GetObjectHash(obj) + }), nil +} + // sendOutToGatewayClients will generate deck content (config) from the provided kong state // and send it out to each of the configured gateway clients. func (c *KongClient) sendOutToGatewayClients( @@ -614,7 +718,7 @@ func (c *KongClient) sendToClient( if err := ctx.Err(); err != nil { logger.Error(err, "Exceeded Kong API timeout, consider increasing --proxy-timeout-seconds") } - return "", fmt.Errorf("performing update for %s failed: %w", client.BaseRootURL(), updateErr) + return "", fmt.Errorf("performing update for %s failed: %w", client.BaseRootURL(), err) } sendDiagnostic(false, nil) // No error occurred. // update the lastConfigSHA with the new updated checksum diff --git a/internal/dataplane/kong_client_test.go b/internal/dataplane/kong_client_test.go index 1a1c07403b..f036414670 100644 --- a/internal/dataplane/kong_client_test.go +++ b/internal/dataplane/kong_client_test.go @@ -23,16 +23,19 @@ import ( corev1 "k8s.io/api/core/v1" netv1 "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/kong/kubernetes-ingress-controller/v3/internal/adminapi" + "github.com/kong/kubernetes-ingress-controller/v3/internal/annotations" "github.com/kong/kubernetes-ingress-controller/v3/internal/clients" dpconf "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/config" "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/configfetcher" "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/deckgen" "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/failures" + "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/fallback" "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/kongstate" "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/sendconfig" "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/translator" @@ -40,6 +43,8 @@ import ( "github.com/kong/kubernetes-ingress-controller/v3/internal/store" "github.com/kong/kubernetes-ingress-controller/v3/internal/util" "github.com/kong/kubernetes-ingress-controller/v3/internal/versions" + kongv1 "github.com/kong/kubernetes-ingress-controller/v3/pkg/apis/configuration/v1" + "github.com/kong/kubernetes-ingress-controller/v3/test/helpers" "github.com/kong/kubernetes-ingress-controller/v3/test/mocks" ) @@ -169,16 +174,15 @@ func (p mockGatewayClientsProvider) GatewayClientsToConfigure() []*adminapi.Clie type mockUpdateStrategyResolver struct { updateCalledForURLs []string lastUpdatedContentForURLs map[string]sendconfig.ContentWithHash - shouldReturnErrorOnUpdate map[string]struct{} + errorsToReturnOnUpdate map[string][]error t *testing.T lock sync.RWMutex - singleError bool } func newMockUpdateStrategyResolver(t *testing.T) *mockUpdateStrategyResolver { return &mockUpdateStrategyResolver{ t: t, - shouldReturnErrorOnUpdate: map[string]struct{}{}, + errorsToReturnOnUpdate: map[string][]error{}, lastUpdatedContentForURLs: map[string]sendconfig.ContentWithHash{}, } } @@ -188,47 +192,58 @@ func (f *mockUpdateStrategyResolver) ResolveUpdateStrategy(c sendconfig.UpdateCl defer f.lock.Unlock() url := c.AdminAPIClient().BaseRootURL() - return &mockUpdateStrategy{onUpdate: f.updateCalledForURLCallback(url, f.singleError)} + return &mockUpdateStrategy{onUpdate: f.updateCalledForURLCallback(url)} } // returnErrorOnUpdate will cause the mockUpdateStrategy with a given Admin API URL to return an error on Update(). -func (f *mockUpdateStrategyResolver) returnErrorOnUpdate(url string, shouldReturnErr bool) { +// Errors will be returned following FIFO order. Each call to this function adds a new error to the queue. +func (f *mockUpdateStrategyResolver) returnErrorOnUpdate(url string) { f.lock.Lock() defer f.lock.Unlock() - if shouldReturnErr { - f.shouldReturnErrorOnUpdate[url] = struct{}{} - } else { - delete(f.shouldReturnErrorOnUpdate, url) - } + f.errorsToReturnOnUpdate[url] = append(f.errorsToReturnOnUpdate[url], errors.New("error on update")) +} + +// returnSpecificErrorOnUpdate will cause the mockUpdateStrategy with a given Admin API URL to return a specific error +// on Update() call. Errors will be returned following FIFO order. Each call to this function adds a new error to the queue. +func (f *mockUpdateStrategyResolver) returnSpecificErrorOnUpdate(url string, err error) { + f.lock.Lock() + defer f.lock.Unlock() + + f.errorsToReturnOnUpdate[url] = append(f.errorsToReturnOnUpdate[url], err) } // updateCalledForURLCallback returns a function that will be called when the mockUpdateStrategy is called. // That enables us to track which URLs were called. -func (f *mockUpdateStrategyResolver) updateCalledForURLCallback(url string, singleError bool) func(sendconfig.ContentWithHash) error { +func (f *mockUpdateStrategyResolver) updateCalledForURLCallback(url string) func(sendconfig.ContentWithHash) error { return func(content sendconfig.ContentWithHash) error { f.lock.Lock() defer f.lock.Unlock() f.updateCalledForURLs = append(f.updateCalledForURLs, url) - if _, ok := f.shouldReturnErrorOnUpdate[url]; ok { - if singleError { - delete(f.shouldReturnErrorOnUpdate, url) + f.lastUpdatedContentForURLs[url] = content + if errsToReturn, ok := f.errorsToReturnOnUpdate[url]; ok { + if len(errsToReturn) > 0 { + err := errsToReturn[0] + f.errorsToReturnOnUpdate[url] = errsToReturn[1:] + return err } - return errors.New("error on update") + return nil } - f.lastUpdatedContentForURLs[url] = content return nil } } // assertUpdateCalledForURLs asserts that the mockUpdateStrategy was called for the given URLs. -func (f *mockUpdateStrategyResolver) assertUpdateCalledForURLs(urls []string) { +func (f *mockUpdateStrategyResolver) assertUpdateCalledForURLs(urls []string, msgAndArgs ...any) { f.lock.RLock() defer f.lock.RUnlock() - require.ElementsMatch(f.t, urls, f.updateCalledForURLs, "update was not called for all URLs") + if len(msgAndArgs) == 0 { + msgAndArgs = []any{"update was not called for all URLs"} + } + require.ElementsMatch(f.t, urls, f.updateCalledForURLs, msgAndArgs...) } func (f *mockUpdateStrategyResolver) assertNoUpdateCalled() { @@ -274,6 +289,24 @@ func (m mockConfigurationChangeDetector) HasConfigurationChanged( return m.hasConfigurationChanged, nil } +// mockKongLastValidConfigFetcher is a mock implementation of FallbackConfigGenerator interface. +type mockFallbackConfigGenerator struct { + generateExcludingAffectedCalledWith lo.Tuple2[store.CacheStores, []fallback.ObjectHash] + generateExcludingAffectedResult store.CacheStores +} + +func newMockFallbackConfigGenerator() *mockFallbackConfigGenerator { + return &mockFallbackConfigGenerator{} +} + +func (m *mockFallbackConfigGenerator) GenerateExcludingAffected( + stores store.CacheStores, + hashes []fallback.ObjectHash, +) (store.CacheStores, error) { + m.generateExcludingAffectedCalledWith = lo.T2(stores, hashes) + return m.generateExcludingAffectedResult, nil +} + func TestKongClientUpdate_AllExpectedClientsAreCalledAndErrorIsPropagated(t *testing.T) { var ( ctx = context.Background() @@ -345,7 +378,7 @@ func TestKongClientUpdate_AllExpectedClientsAreCalledAndErrorIsPropagated(t *tes } updateStrategyResolver := newMockUpdateStrategyResolver(t) for _, url := range tc.errorOnUpdateForURLs { - updateStrategyResolver.returnErrorOnUpdate(url, true) + updateStrategyResolver.returnErrorOnUpdate(url) } // always return true for HasConfigurationChanged to trigger an update configChangeDetector := mockConfigurationChangeDetector{ @@ -421,7 +454,7 @@ func (m *mockConfigStatusQueue) Notifications() []clients.ConfigStatus { type mockKongConfigBuilder struct { translationFailuresToReturn []failures.ResourceFailure kongState *kongstate.KongState - updateCacheCalled bool + updateCacheCalls []store.CacheStores } func newMockKongConfigBuilder() *mockKongConfigBuilder { @@ -437,12 +470,8 @@ func (p *mockKongConfigBuilder) BuildKongConfig() translator.KongConfigBuildingR } } -func (p *mockKongConfigBuilder) UpdateCache(store.CacheStores) { - p.updateCacheCalled = true -} - -func (p *mockKongConfigBuilder) IngressClassName() string { - return "kong" +func (p *mockKongConfigBuilder) UpdateCache(s store.CacheStores) { + p.updateCacheCalls = append(p.updateCacheCalls, s) } func (p *mockKongConfigBuilder) returnTranslationFailures(enabled bool) { @@ -477,72 +506,70 @@ func TestKongClientUpdate_ConfigStatusIsNotified(t *testing.T) { konnectClient: testKonnectClient, } - updateStrategyResolver = newMockUpdateStrategyResolver(t) - configChangeDetector = mockConfigurationChangeDetector{hasConfigurationChanged: true} - configBuilder = newMockKongConfigBuilder() - kongRawStateGetter = &mockKongLastValidConfigFetcher{} - kongClient = setupTestKongClient(t, updateStrategyResolver, clientsProvider, configChangeDetector, configBuilder, nil, kongRawStateGetter) + configChangeDetector = mockConfigurationChangeDetector{hasConfigurationChanged: true} + configBuilder = newMockKongConfigBuilder() ) testCases := []struct { - name string - gatewayFailure bool - konnectFailure bool - translationFailures bool - expectedStatus clients.ConfigStatus + name string + gatewayFailuresCount int + konnectFailuresCount int + translationFailures bool + expectedStatus clients.ConfigStatus }{ { name: "success", - gatewayFailure: false, - konnectFailure: false, translationFailures: false, expectedStatus: clients.ConfigStatusOK, }, { - name: "gateway failure", - gatewayFailure: true, - konnectFailure: false, - translationFailures: false, - expectedStatus: clients.ConfigStatusApplyFailed, + name: "gateway failure", + gatewayFailuresCount: 2, + translationFailures: false, + expectedStatus: clients.ConfigStatusApplyFailed, }, { name: "translation failures", - gatewayFailure: false, - konnectFailure: false, translationFailures: true, expectedStatus: clients.ConfigStatusTranslationErrorHappened, }, { - name: "konnect failure", - gatewayFailure: false, - konnectFailure: true, - translationFailures: false, - expectedStatus: clients.ConfigStatusOKKonnectApplyFailed, + name: "konnect failure", + konnectFailuresCount: 2, + translationFailures: false, + expectedStatus: clients.ConfigStatusOKKonnectApplyFailed, }, { - name: "both gateway and konnect failure", - gatewayFailure: true, - konnectFailure: true, - translationFailures: false, - expectedStatus: clients.ConfigStatusApplyFailedKonnectApplyFailed, + name: "both gateway and konnect failure", + gatewayFailuresCount: 2, + konnectFailuresCount: 2, + translationFailures: false, + expectedStatus: clients.ConfigStatusApplyFailedKonnectApplyFailed, }, { - name: "translation failures and konnect failure", - gatewayFailure: false, - konnectFailure: true, - translationFailures: true, - expectedStatus: clients.ConfigStatusTranslationErrorHappenedKonnectApplyFailed, + name: "translation failures and konnect failure", + konnectFailuresCount: 2, + translationFailures: true, + expectedStatus: clients.ConfigStatusTranslationErrorHappenedKonnectApplyFailed, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - // Reset the status queue. We want to make sure that the status is always notified. - statusQueue := newMockConfigStatusQueue() - kongClient.SetConfigStatusNotifier(statusQueue) + var ( + kongRawStateGetter = &mockKongLastValidConfigFetcher{} + updateStrategyResolver = newMockUpdateStrategyResolver(t) + statusQueue = newMockConfigStatusQueue() + kongClient = setupTestKongClient(t, updateStrategyResolver, clientsProvider, configChangeDetector, configBuilder, nil, kongRawStateGetter) + ) - updateStrategyResolver.returnErrorOnUpdate(testGatewayClient.BaseRootURL(), tc.gatewayFailure) - updateStrategyResolver.returnErrorOnUpdate(testKonnectClient.BaseRootURL(), tc.konnectFailure) + kongClient.SetConfigStatusNotifier(statusQueue) + for range tc.gatewayFailuresCount { + updateStrategyResolver.returnErrorOnUpdate(testGatewayClient.BaseRootURL()) + } + for range tc.konnectFailuresCount { + updateStrategyResolver.returnErrorOnUpdate(testKonnectClient.BaseRootURL()) + } configBuilder.returnTranslationFailures(tc.translationFailures) _ = kongClient.Update(ctx) @@ -705,6 +732,7 @@ func setupTestKongClient( kongRawStateGetter, configBuilder, store.NewCacheStores(), + newMockFallbackConfigGenerator(), ) require.NoError(t, err) return kongClient @@ -740,7 +768,6 @@ func mapClientsToUrls(clients mockGatewayClientsProvider) []string { type mockKongLastValidConfigFetcher struct { kongRawState *utils.KongRawState lastKongState *kongstate.KongState - status kong.Status } func (cf *mockKongLastValidConfigFetcher) LastValidConfig() (*kongstate.KongState, bool) { @@ -772,9 +799,8 @@ func TestKongClientUpdate_FetchStoreAndPushLastValidConfig(t *testing.T) { }, } - updateStrategyResolver = newMockUpdateStrategyResolver(t) - configChangeDetector = mockConfigurationChangeDetector{hasConfigurationChanged: true} - lastKongRawState = &utils.KongRawState{ + configChangeDetector = mockConfigurationChangeDetector{hasConfigurationChanged: true} + lastKongRawState = &utils.KongRawState{ Services: []*kong.Service{ { Name: kong.String("last_service"), @@ -814,9 +840,8 @@ func TestKongClientUpdate_FetchStoreAndPushLastValidConfig(t *testing.T) { testCases := []struct { name string - gatewayFailure bool translationFailures bool - singleError bool + gatewayFailuresCount int lastValidKongRawState *utils.KongRawState lastKongStatusHash string expectedLastKongState *kongstate.KongState @@ -830,16 +855,14 @@ func TestKongClientUpdate_FetchStoreAndPushLastValidConfig(t *testing.T) { }, { name: "no previous state, failure", - gatewayFailure: true, - singleError: true, + gatewayFailuresCount: 1, expectedLastKongState: nil, errorsSize: 1, lastKongStatusHash: sendconfig.WellKnownInitialHash, }, { name: "previous state, failure, fallback pushed with success", - gatewayFailure: true, - singleError: true, + gatewayFailuresCount: 1, lastValidKongRawState: lastKongRawState, expectedLastKongState: lastKongState, errorsSize: 1, @@ -847,8 +870,7 @@ func TestKongClientUpdate_FetchStoreAndPushLastValidConfig(t *testing.T) { }, { name: "previous state, failure, fallback pushed with failure", - gatewayFailure: true, - singleError: false, + gatewayFailuresCount: 2, lastValidKongRawState: lastKongRawState, expectedLastKongState: lastKongState, errorsSize: 3, @@ -858,13 +880,14 @@ func TestKongClientUpdate_FetchStoreAndPushLastValidConfig(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - updateStrategyResolver.returnErrorOnUpdate(clientsProvider.gatewayClients[0].BaseRootURL(), tc.gatewayFailure) - updateStrategyResolver.returnErrorOnUpdate(clientsProvider.gatewayClients[1].BaseRootURL(), tc.gatewayFailure) + updateStrategyResolver := newMockUpdateStrategyResolver(t) + for range tc.gatewayFailuresCount { + updateStrategyResolver.returnErrorOnUpdate(clientsProvider.gatewayClients[0].BaseRootURL()) + updateStrategyResolver.returnErrorOnUpdate(clientsProvider.gatewayClients[1].BaseRootURL()) + } - updateStrategyResolver.singleError = tc.singleError configChangeDetector.status.ConfigurationHash = tc.lastKongStatusHash kongRawStateGetter := &mockKongLastValidConfigFetcher{ - status: configChangeDetector.status, kongRawState: tc.lastValidKongRawState, } kongClient := setupTestKongClient( @@ -932,15 +955,37 @@ func TestKongClientUpdate_KonnectUpdatesAreSanitized(t *testing.T) { require.Equal(t, "{vault://redacted-value}", *cert.Key, "expected Konnect to have redacted certificate key") } -func TestKongClient_FallbackConfiguration(t *testing.T) { +func TestKongClient_FallbackConfiguration_SuccessfulRecovery(t *testing.T) { ctx := context.Background() + gwClient := mustSampleGatewayClient(t) + konnectClient := mustSampleKonnectClient(t) clientsProvider := mockGatewayClientsProvider{ - gatewayClients: []*adminapi.Client{mustSampleGatewayClient(t)}, + gatewayClients: []*adminapi.Client{gwClient}, + konnectClient: konnectClient, } updateStrategyResolver := newMockUpdateStrategyResolver(t) configChangeDetector := mockConfigurationChangeDetector{hasConfigurationChanged: true} configBuilder := newMockKongConfigBuilder() - kongRawStateGetter := &mockKongLastValidConfigFetcher{} + lastValidConfigFetcher := &mockKongLastValidConfigFetcher{} + fallbackConfigGenerator := newMockFallbackConfigGenerator() + + // We'll use KongConsumer as an example of a broken object, but it could be any supported type + // for the purpose of this test as the fallback config generator is mocked anyway. + someConsumer := func(name string) *kongv1.KongConsumer { + return &kongv1.KongConsumer{ + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + Namespace: "namespace", + Annotations: map[string]string{ + annotations.IngressClassKey: annotations.DefaultIngressClass, + }, + }, + Username: name, + } + } + validConsumer := someConsumer("valid") + brokenConsumer := someConsumer("broken") + originalCache := cacheStoresFromObjs(t, validConsumer, brokenConsumer) kongClient, err := NewKongClient( zapr.NewLogger(zap.NewNop()), time.Second, @@ -953,12 +998,161 @@ func TestKongClient_FallbackConfiguration(t *testing.T) { clientsProvider, updateStrategyResolver, configChangeDetector, - kongRawStateGetter, + lastValidConfigFetcher, configBuilder, - store.NewCacheStores(), + originalCache, + fallbackConfigGenerator, ) require.NoError(t, err) - require.NoError(t, kongClient.Update(ctx)) - require.True(t, configBuilder.updateCacheCalled, "expected store to be updated with a snapshot") + t.Log("Setting update strategy to return an error on the first call to trigger fallback configuration generation") + updateStrategyResolver.returnSpecificErrorOnUpdate(gwClient.BaseRootURL(), sendconfig.NewUpdateError( + []failures.ResourceFailure{ + lo.Must(failures.NewResourceFailure("violated constraint", brokenConsumer)), + }, + errors.New("error on update"), + )) + + t.Log("Setting the config builder to return KongState with the valid consumer only") + configBuilder.kongState = &kongstate.KongState{ + Consumers: []kongstate.Consumer{ + { + Consumer: kong.Consumer{ + Username: lo.ToPtr(validConsumer.Username), + }, + }, + }, + } + + t.Log("Setting the fallback config generator to return a snapshot excluding the broken consumer") + fallbackCacheStoresToBeReturned := cacheStoresFromObjs(t, validConsumer) + fallbackConfigGenerator.generateExcludingAffectedResult = fallbackCacheStoresToBeReturned + + t.Log("Calling KongClient.Update") + err = kongClient.Update(ctx) + require.Error(t, err) + + t.Log("Verifying that the config builder cache was updated twice") + require.Len(t, configBuilder.updateCacheCalls, 2, + "expected cache to be updated with a snapshot twice: first with the initial cache snapshot, then with the fallback one") + + t.Log("Verifying that the first cache update contains both consumers") + firstCacheUpdate := configBuilder.updateCacheCalls[0] + require.NotEqual(t, originalCache, firstCacheUpdate, "expected cache to be updated with a new snapshot") + _, hasConsumer, err := firstCacheUpdate.Consumer.Get(brokenConsumer) + require.NoError(t, err) + require.True(t, hasConsumer, "expected consumer to be in the first cache snapshot") + + t.Log("Verifying that the fallback config generator was called with the first cache snapshot and the broken object hash") + expectedGenerateExcludingAffectedArgs := lo.T2(firstCacheUpdate, []fallback.ObjectHash{fallback.GetObjectHash(brokenConsumer)}) + require.Equal(t, expectedGenerateExcludingAffectedArgs, fallbackConfigGenerator.generateExcludingAffectedCalledWith, + "expected fallback config generator to be called with the first cache snapshot and the broken object hash") + + t.Log("Verifying that the second config builder cache update contains the fallback snapshot") + secondCacheUpdate := configBuilder.updateCacheCalls[1] + require.Equal(t, fallbackCacheStoresToBeReturned, secondCacheUpdate, + "expected cache to be updated with the fallback snapshot on second call") + + t.Log("Verifying that the update strategy was called twice for gateway and Konnect") + updateStrategyResolver.assertUpdateCalledForURLs( + []string{ + gwClient.BaseRootURL(), konnectClient.BaseRootURL(), + gwClient.BaseRootURL(), konnectClient.BaseRootURL(), + }, + "expected update to be called twice: first with the initial config, then with the fallback one", + ) + + t.Log("Verifying that the last valid config is updated with the config excluding the broken consumer") + lastValidConfig, _ := lastValidConfigFetcher.LastValidConfig() + require.Len(t, lastValidConfig.Consumers, 1) + require.Equal(t, validConsumer.Username, *lastValidConfig.Consumers[0].Username) +} + +func TestKongClient_FallbackConfiguration_FailedRecovery(t *testing.T) { + ctx := context.Background() + gwClient := mustSampleGatewayClient(t) + konnectClient := mustSampleKonnectClient(t) + clientsProvider := mockGatewayClientsProvider{ + gatewayClients: []*adminapi.Client{gwClient}, + konnectClient: konnectClient, + } + updateStrategyResolver := newMockUpdateStrategyResolver(t) + configChangeDetector := mockConfigurationChangeDetector{hasConfigurationChanged: true} + configBuilder := newMockKongConfigBuilder() + lastValidConfigFetcher := &mockKongLastValidConfigFetcher{} + fallbackConfigGenerator := newMockFallbackConfigGenerator() + + // We'll use KongConsumer as an example of a broken object, but it could be any supported type + // for the purpose of this test as the fallback config generator is mocked anyway. + someConsumer := func(name string) *kongv1.KongConsumer { + return &kongv1.KongConsumer{ + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + Namespace: "namespace", + Annotations: map[string]string{ + annotations.IngressClassKey: annotations.DefaultIngressClass, + }, + }, + Username: name, + } + } + brokenConsumer := someConsumer("broken") + originalCache := cacheStoresFromObjs(t, brokenConsumer) + kongClient, err := NewKongClient( + zapr.NewLogger(zap.NewNop()), + time.Second, + util.ConfigDumpDiagnostic{}, + sendconfig.Config{ + FallbackConfiguration: true, + }, + mocks.NewEventRecorder(), + dpconf.DBModeOff, + clientsProvider, + updateStrategyResolver, + configChangeDetector, + lastValidConfigFetcher, + configBuilder, + originalCache, + fallbackConfigGenerator, + ) + require.NoError(t, err) + + t.Log("Setting update strategy to return an error on the first call to trigger fallback configuration generation") + updateStrategyResolver.returnSpecificErrorOnUpdate(gwClient.BaseRootURL(), sendconfig.NewUpdateError( + []failures.ResourceFailure{ + lo.Must(failures.NewResourceFailure("violated constraint", brokenConsumer)), + }, + errors.New("error on update"), + )) + + t.Log("Setting update strategy to return an error on the second call (fallback) to trigger a failed recovery") + updateStrategyResolver.returnErrorOnUpdate(gwClient.BaseRootURL()) + + t.Log("Calling KongClient.Update") + err = kongClient.Update(ctx) + require.Error(t, err) + + t.Log("Verifying that the update strategy was called twice for gateway, skipping Konnect on fallback failure") + updateStrategyResolver.assertUpdateCalledForURLs( + []string{ + gwClient.BaseRootURL(), konnectClient.BaseRootURL(), + gwClient.BaseRootURL(), + }, + "expected update to be called twice: first with the initial config, then with the fallback one", + ) + + t.Log("Verifying that the last valid config is empty") + _, hasLastValidConfig := lastValidConfigFetcher.LastValidConfig() + require.False(t, hasLastValidConfig, "expected no last valid config to be stored as no successful recovery happened") +} + +func cacheStoresFromObjs(t *testing.T, objs ...runtime.Object) store.CacheStores { + for i := range objs { + obj := objs[i].(client.Object) + obj = helpers.WithTypeMeta(t, obj) + objs[i] = obj + } + s, err := store.NewCacheStoresFromObjs(objs...) + require.NoError(t, err) + return s } diff --git a/internal/manager/run.go b/internal/manager/run.go index 741e3c15a4..a56b4ba129 100644 --- a/internal/manager/run.go +++ b/internal/manager/run.go @@ -26,6 +26,7 @@ import ( "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane" dpconf "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/config" "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/configfetcher" + "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/fallback" "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/sendconfig" "github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/translator" "github.com/kong/kubernetes-ingress-controller/v3/internal/gatewayapi" @@ -191,6 +192,7 @@ func Run( updateStrategyResolver := sendconfig.NewDefaultUpdateStrategyResolver(kongConfig, logger) configurationChangeDetector := sendconfig.NewDefaultConfigurationChangeDetector(logger) kongConfigFetcher := configfetcher.NewDefaultKongLastGoodConfigFetcher(translatorFeatureFlags.FillIDs, c.KongWorkspace) + fallbackConfigGenerator := fallback.NewGenerator(fallback.NewDefaultCacheGraphProvider(), logger) dataplaneClient, err := dataplane.NewKongClient( logger, time.Duration(c.ProxyTimeoutSeconds*float32(time.Second)), @@ -204,6 +206,7 @@ func Run( kongConfigFetcher, configTranslator, cache, + fallbackConfigGenerator, ) if err != nil { return fmt.Errorf("failed to initialize kong data-plane client: %w", err)