diff --git a/pkg/api/handlers.go b/pkg/api/handlers.go index 8d2aea6f..b35d9df3 100644 --- a/pkg/api/handlers.go +++ b/pkg/api/handlers.go @@ -864,6 +864,6 @@ func getStorageClass(cache *resources.Cache) func(w http.ResponseWriter, r *http // @Produce json // @Success 200 // @Router /health [get] -func checkClusteConnection(k8sSession *session.K8sSession, disconnected chan error) http.HandlerFunc { - return session.MonitorConnection(k8sSession, disconnected) +func checkClusteConnection(k8sSession *session.K8sSession) http.HandlerFunc { + return k8sSession.ServeConnStatus() } diff --git a/pkg/api/resources/cache.go b/pkg/api/resources/cache.go index b14e12b6..0ba3f503 100644 --- a/pkg/api/resources/cache.go +++ b/pkg/api/resources/cache.go @@ -114,7 +114,7 @@ func NewCache(ctx context.Context, clients *client.Clients) (*Cache, error) { go c.factory.Start(c.stopper) go c.dynamicFactory.Start(c.stopper) - // Wait for the pod cache to sync as they it is required for metrics collection + // Wait for the pod cache to sync as it is required for metrics collection if !cache.WaitForCacheSync(ctx.Done(), c.Pods.HasSynced) { return nil, fmt.Errorf("timed out waiting for caches to sync") } diff --git a/pkg/api/start.go b/pkg/api/start.go index b29a0419..4056eadd 100644 --- a/pkg/api/start.go +++ b/pkg/api/start.go @@ -20,7 +20,6 @@ import ( udsMiddleware "github.com/defenseunicorns/uds-runtime/pkg/api/middleware" "github.com/defenseunicorns/uds-runtime/pkg/api/monitor" "github.com/defenseunicorns/uds-runtime/pkg/api/resources" - "github.com/defenseunicorns/uds-runtime/pkg/k8s/client" "github.com/defenseunicorns/uds-runtime/pkg/k8s/session" "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" @@ -45,17 +44,14 @@ func Setup(assets *embed.FS) (*chi.Mux, bool, error) { inCluster := k8sSession.InCluster - // Create the disconnected channel - disconnected := make(chan error) - if !inCluster { apiAuth, token, err = checkForLocalAuth() if err != nil { return nil, inCluster, fmt.Errorf("failed to set auth: %w", err) } - // Start the reconnection goroutine - go k8sSession.HandleReconnection(disconnected, client.NewClient, resources.NewCache) + // Start the cluster monitoring goroutine + go k8sSession.StartClusterMonitoring() } authSVC := checkForClusterAuth() @@ -83,7 +79,7 @@ func Setup(assets *embed.FS) (*chi.Mux, bool, error) { http.Redirect(w, r, "/swagger/index.html", http.StatusMovedPermanently) }) r.Get("/swagger/*", httpSwagger.WrapHandler) - r.Get("/health", checkClusteConnection(k8sSession, disconnected)) + r.Get("/health", checkClusteConnection(k8sSession)) r.Route("/api/v1", func(r chi.Router) { // Require a valid token for API calls if apiAuth { diff --git a/pkg/k8s/session/session.go b/pkg/k8s/session/session.go index 3875f23c..b9f0fbb5 100644 --- a/pkg/k8s/session/session.go +++ b/pkg/k8s/session/session.go @@ -2,7 +2,6 @@ package session import ( "context" - "encoding/json" "fmt" "log" "net/http" @@ -18,10 +17,14 @@ import ( type K8sSession struct { Clients *client.Clients Cache *resources.Cache + Cancel context.CancelFunc CurrentCtx string CurrentCluster string - Cancel context.CancelFunc + Status chan string InCluster bool + ready bool + createCache createCache + createClient createClient } type createClient func() (*client.Clients, error) @@ -64,54 +67,82 @@ func CreateK8sSession() (*K8sSession, error) { CurrentCluster: currentCluster, Cancel: cancel, InCluster: inCluster, + Status: make(chan string), + ready: true, + createCache: resources.NewCache, + createClient: client.NewClient, } return session, nil } -// HandleReconnection is a goroutine that handles reconnection to the k8s API -// passing createClient and createCache instead of calling clients.NewClient and resources.NewCache for testing purposes -func (ks *K8sSession) HandleReconnection(disconnected chan error, createClient createClient, - createCache createCache) { - for err := range disconnected { - log.Printf("Disconnected error received: %v\n", err) - for { - // Cancel the previous context - ks.Cancel() - time.Sleep(getRetryInterval()) - - currentCtx, currentCluster, err := client.GetCurrentContext() - if err != nil { - log.Printf("Error fetching current context: %v\n", err) - continue - } +// StartClusterMonitoring is a goroutine that checks the connection to the cluster +func (ks *K8sSession) StartClusterMonitoring() { + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() - // If the current context or cluster is different from the original, skip reconnection - if currentCtx != ks.CurrentCtx || currentCluster != ks.CurrentCluster { - log.Println("Current context has changed. Skipping reconnection.") - continue - } + for range ticker.C { + // Skip if not ready, prevents race conditions using new cache + if !ks.ready { + continue + } + // Perform cluster health check + _, err := ks.Clients.Clientset.ServerVersion() + if err != nil { + ks.Status <- "error" + ks.HandleReconnection() + } else { + ks.Status <- "success" + } + } +} - k8sClient, err := createClient() - if err != nil { - log.Printf("Retrying to create k8s client: %v\n", err) - continue - } +// HandleReconnection infinitely retries to re-create the client and cache of the formerly connected cluster +func (ks *K8sSession) HandleReconnection() { + log.Println("Disconnected error received") - // Create a new context and cache - ctx, cancel := context.WithCancel(context.Background()) - cache, err := createCache(ctx, k8sClient) - if err != nil { - log.Printf("Retrying to create cache: %v\n", err) - continue - } + // Set ready to false to block cluster check ticker + ks.ready = false - ks.Clients = k8sClient - ks.Cache = cache - ks.Cancel = cancel - log.Println("Successfully reconnected to k8s and recreated cache") - break + for { + // Cancel the previous context + ks.Cancel() + time.Sleep(getRetryInterval()) + + currentCtx, currentCluster, err := client.GetCurrentContext() + if err != nil { + log.Printf("Error fetching current context: %v\n", err) + continue } + + // If the current context or cluster is different from the original, skip reconnection + if currentCtx != ks.CurrentCtx || currentCluster != ks.CurrentCluster { + log.Println("Current context has changed. Skipping reconnection.") + continue + } + + k8sClient, err := ks.createClient() + if err != nil { + log.Printf("Retrying to create k8s client: %v\n", err) + continue + } + + // Create a new context and cache + ctx, cancel := context.WithCancel(context.Background()) + cache, err := ks.createCache(ctx, k8sClient) + if err != nil { + log.Printf("Retrying to create cache: %v\n", err) + continue + } + + ks.Clients = k8sClient + ks.Cache = cache + ks.Cancel = cancel + + ks.ready = true + log.Println("Successfully reconnected to cluster and recreated cache") + + break } } @@ -126,83 +157,45 @@ func getRetryInterval() time.Duration { return 5 * time.Second // Default to 5 seconds if not set } -func MonitorConnection(k8sSession *K8sSession, disconnected chan error) http.HandlerFunc { +// ServeConnStatus returns a handler function that streams the connection status to the client +func (ks *K8sSession) ServeConnStatus() http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { // Set headers to keep connection alive rest.WriteHeaders(w) - ticker := time.NewTicker(30 * time.Second) - defer ticker.Stop() - - recovering := false - - // Function to check the cluster health when running out of cluster - checkCluster := func() { - versionInfo, err := k8sSession.Clients.Clientset.ServerVersion() - response := map[string]string{} - - // if err then connection is lost - if err != nil { - response["error"] = err.Error() - w.WriteHeader(http.StatusInternalServerError) - disconnected <- err - // indicate that the reconnection handler should have been triggered by the disconnected channel - recovering = true - } else if recovering { - // if errors are resolved, send a reconnected message - response["reconnected"] = versionInfo.String() - recovering = false - } else { - response["success"] = versionInfo.String() - w.WriteHeader(http.StatusOK) - } - - data, err := json.Marshal(response) - if err != nil { - http.Error(w, fmt.Sprintf("data: Error: %v\n\n", err), http.StatusInternalServerError) - return - } - - // Write the data to the response - fmt.Fprintf(w, "data: %s\n\n", data) + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "Streaming unsupported!", http.StatusInternalServerError) + return + } - // Flush the response to ensure it is sent to the client - if flusher, ok := w.(http.Flusher); ok { - flusher.Flush() - } + // If running in cluster don't check connection + if ks.InCluster { + fmt.Fprint(w, "event: close\ndata: in-cluster\n\n") + flusher.Flush() } - // If running in cluster don't check for version and send error or reconnected events - if k8sSession.InCluster { - checkCluster = func() { - response := map[string]string{ - "success": "in-cluster", - } - data, err := json.Marshal(response) - if err != nil { - http.Error(w, fmt.Sprintf("data: Error: %v\n\n", err), http.StatusInternalServerError) - return - } - // Write the data to the response - fmt.Fprintf(w, "data: %s\n\n", data) - - // Flush the response to ensure it is sent to the client - if flusher, ok := w.(http.Flusher); ok { - flusher.Flush() - } - } + sendStatus := func(msg string) { + fmt.Fprintf(w, "data: %s\n\n", msg) + flusher.Flush() } - // Check the cluster immediately - checkCluster() + // To mitigate timing between connection start and getting status updates, immediately check cluster connection + _, err := ks.Clients.Clientset.ServerVersion() + if err != nil { + sendStatus("error") + } else { + sendStatus("success") + } + // Listen for updates and send them to the client for { select { - case <-ticker.C: - checkCluster() + case msg := <-ks.Status: + sendStatus(msg) case <-r.Context().Done(): - // Client closed the connection + // Client disconnected return } } diff --git a/pkg/k8s/session/session_test.go b/pkg/k8s/session/session_test.go index cee95c36..ec2469e1 100644 --- a/pkg/k8s/session/session_test.go +++ b/pkg/k8s/session/session_test.go @@ -1,5 +1,3 @@ -//go:build unit - package session import ( @@ -24,31 +22,29 @@ func TestHandleReconnection(t *testing.T) { return "original-context", "original-cluster", nil } + createClientMock := func() (*client.Clients, error) { + return &client.Clients{Clientset: &kubernetes.Clientset{}}, nil + } + + createCacheMock := func(ctx context.Context, client *client.Clients) (*resources.Cache, error) { + return &resources.Cache{Pods: &resources.ResourceList{}}, nil + } + k8sSession := &K8sSession{ Clients: &client.Clients{}, Cache: &resources.Cache{}, Cancel: func() {}, CurrentCtx: "original-context", CurrentCluster: "original-cluster", + createCache: createCacheMock, + createClient: createClientMock, } require.Nil(t, k8sSession.Clients.Clientset) require.Nil(t, k8sSession.Cache.Pods) - createClientMock := func() (*client.Clients, error) { - return &client.Clients{Clientset: &kubernetes.Clientset{}}, nil - } - - createCacheMock := func(ctx context.Context, client *client.Clients) (*resources.Cache, error) { - return &resources.Cache{Pods: &resources.ResourceList{}}, nil - } - - disconnected := make(chan error, 1) - - disconnected <- fmt.Errorf("simulated disconnection") - // Run the handleReconnection function in a goroutine - go k8sSession.HandleReconnection(disconnected, createClientMock, createCacheMock) + go k8sSession.HandleReconnection() // Wait for the reconnection logic to complete time.Sleep(200 * time.Millisecond) @@ -56,8 +52,6 @@ func TestHandleReconnection(t *testing.T) { // Verify that the K8sResources struct was updated require.NotNil(t, k8sSession.Clients.Clientset) require.NotNil(t, k8sSession.Cache.Pods) - - close(disconnected) } // Test createClient returns an error @@ -65,14 +59,6 @@ func TestHandleReconnectionCreateClientError(t *testing.T) { os.Setenv("CONNECTION_RETRY_MS", "100") defer os.Unsetenv("CONNECTION_RETRY_MS") - k8sSession := &K8sSession{ - Clients: &client.Clients{}, - Cache: &resources.Cache{}, - Cancel: func() {}, - CurrentCtx: "original-context", - CurrentCluster: "original-cluster", - } - // Mock GetCurrentContext to return the same context and cluster as the original client.GetCurrentContext = func() (string, string, error) { return "original-context", "original-cluster", nil @@ -87,19 +73,24 @@ func TestHandleReconnectionCreateClientError(t *testing.T) { return &resources.Cache{Pods: &resources.ResourceList{}}, nil } - disconnected := make(chan error, 1) - disconnected <- fmt.Errorf("simulated disconnection") + k8sSession := &K8sSession{ + Clients: &client.Clients{}, + Cache: &resources.Cache{}, + Cancel: func() {}, + CurrentCtx: "original-context", + CurrentCluster: "original-cluster", + createCache: createCacheMock, + createClient: createClientMock, + } // Run the handleReconnection function in a goroutine - go k8sSession.HandleReconnection(disconnected, createClientMock, createCacheMock) + go k8sSession.HandleReconnection() // Wait for the reconnection logic to attempt creating the client time.Sleep(200 * time.Millisecond) require.Nil(t, k8sSession.Clients.Clientset) require.Nil(t, k8sSession.Cache.Pods) - - close(disconnected) } // Test createCache returns an error @@ -107,14 +98,6 @@ func TestHandleReconnectionCreateCacheError(t *testing.T) { os.Setenv("CONNECTION_RETRY_MS", "100") defer os.Unsetenv("CONNECTION_RETRY_MS") - k8sSession := &K8sSession{ - Clients: &client.Clients{}, - Cache: &resources.Cache{}, - Cancel: func() {}, - CurrentCtx: "original-context", - CurrentCluster: "original-cluster", - } - // Mock GetCurrentContext to return the same context and cluster as the original client.GetCurrentContext = func() (string, string, error) { return "original-context", "original-cluster", nil @@ -128,12 +111,18 @@ func TestHandleReconnectionCreateCacheError(t *testing.T) { return nil, fmt.Errorf("failed to create cache") } - disconnected := make(chan error, 1) - - disconnected <- fmt.Errorf("simulated disconnection") + k8sSession := &K8sSession{ + Clients: &client.Clients{}, + Cache: &resources.Cache{}, + Cancel: func() {}, + CurrentCtx: "original-context", + CurrentCluster: "original-cluster", + createCache: createCacheMock, + createClient: createClientMock, + } // Run the handleReconnection function in a goroutine - go k8sSession.HandleReconnection(disconnected, createClientMock, createCacheMock) + go k8sSession.HandleReconnection() // Wait for the reconnection logic to complete time.Sleep(200 * time.Millisecond) @@ -141,8 +130,6 @@ func TestHandleReconnectionCreateCacheError(t *testing.T) { // Verify that the K8sResources cache was not updated since cache creation failed require.Nil(t, k8sSession.Clients.Clientset) require.Nil(t, k8sSession.Cache.Pods) - - close(disconnected) } func TestHandleReconnectionContextChanged(t *testing.T) { @@ -154,14 +141,6 @@ func TestHandleReconnectionContextChanged(t *testing.T) { return "new-context", "new-cluster", nil } - k8sSession := &K8sSession{ - Clients: &client.Clients{}, - Cache: &resources.Cache{}, - Cancel: func() {}, - CurrentCtx: "original-context", - CurrentCluster: "original-cluster", - } - createClientMock := func() (*client.Clients, error) { return &client.Clients{Clientset: &kubernetes.Clientset{}}, nil } @@ -170,13 +149,18 @@ func TestHandleReconnectionContextChanged(t *testing.T) { return &resources.Cache{Pods: &resources.ResourceList{}}, nil } - disconnected := make(chan error, 1) - - // Simulate a disconnection - disconnected <- fmt.Errorf("simulated disconnection") + k8sSession := &K8sSession{ + Clients: &client.Clients{}, + Cache: &resources.Cache{}, + Cancel: func() {}, + CurrentCtx: "original-context", + CurrentCluster: "original-cluster", + createCache: createCacheMock, + createClient: createClientMock, + } // Run the handleReconnection function in a goroutine - go k8sSession.HandleReconnection(disconnected, createClientMock, createCacheMock) + go k8sSession.HandleReconnection() // Wait for the reconnection logic to complete time.Sleep(200 * time.Millisecond) @@ -184,6 +168,4 @@ func TestHandleReconnectionContextChanged(t *testing.T) { // Verify that the K8sResources struct was not updated since the context/cluster has changed require.Nil(t, k8sSession.Clients.Clientset) require.Nil(t, k8sSession.Cache.Pods) - - close(disconnected) } diff --git a/tasks/build.yaml b/tasks/build.yaml index f97d87ea..0eea1ad3 100644 --- a/tasks/build.yaml +++ b/tasks/build.yaml @@ -91,4 +91,4 @@ tasks: - task: api-linux-amd64 - cmd: docker build --build-arg TARGETARCH=amd64 -t uds-runtime:smoke-test -f ./Dockerfile . - cmd: uds zarf package create -o build --confirm - dir: ${DIR} + dir: hack/smoke-test diff --git a/tasks/test.yaml b/tasks/test.yaml index 7f6b085a..32b6b546 100644 --- a/tasks/test.yaml +++ b/tasks/test.yaml @@ -90,7 +90,7 @@ tasks: - task: build:smoke-img-pkg - task: setup:slim-cluster - cmd: uds zarf package deploy build/zarf-package-uds-runtime-amd64-smoke-test.tar.zst --confirm - dir: ${DIR} + dir: hack/smoke-test - description: Validate Runtime Pod wait: cluster: diff --git a/ui/src/lib/components/k8s/DataTable/component.svelte b/ui/src/lib/components/k8s/DataTable/component.svelte index de49111b..21aa2365 100644 --- a/ui/src/lib/components/k8s/DataTable/component.svelte +++ b/ui/src/lib/components/k8s/DataTable/component.svelte @@ -127,6 +127,7 @@ console.log('Cluster reconnected, restarting store') // stop current store first stop() + // recreate rows to trigger re-render rows = createStore() stop = rows.start() diff --git a/ui/src/lib/features/toast/component.svelte b/ui/src/lib/features/toast/component.svelte index 2fc38c26..75b6a696 100644 --- a/ui/src/lib/features/toast/component.svelte +++ b/ui/src/lib/features/toast/component.svelte @@ -4,7 +4,7 @@ import { removeToast, toast } from './store' -