Skip to content

Commit

Permalink
feat: alert users to cluster disconnection and reconnect (#270)
Browse files Browse the repository at this point in the history
Co-authored-by: decleaver <85503726+decleaver@users.noreply.github.com>
  • Loading branch information
TristanHoladay and decleaver authored Sep 17, 2024
1 parent 734cc33 commit d121bdc
Show file tree
Hide file tree
Showing 14 changed files with 724 additions and 88 deletions.
69 changes: 69 additions & 0 deletions pkg/api/handlers.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package api

import (
"encoding/json"
"fmt"
"net/http"
"time"

_ "github.com/defenseunicorns/uds-runtime/pkg/api/docs" //nolint:staticcheck
"github.com/defenseunicorns/uds-runtime/pkg/api/resources"
Expand Down Expand Up @@ -857,3 +860,69 @@ func getStorageClasses(cache *resources.Cache) func(w http.ResponseWriter, r *ht
func getStorageClass(cache *resources.Cache) func(w http.ResponseWriter, r *http.Request) {
return rest.Bind(cache.StorageClasses)
}

// @Description Get Cluster Connection Health
// @Tags cluster-health
// @Produce json
// @Success 200
// @Router /health [get]
func checkHealth(k8sResources *K8sResources, disconnected chan error) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// Set headers to keep connection alive
rest.WriteHeaders(w)

ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()

recovering := false

checkCluster := func() {
versionInfo, err := k8sResources.Client.Clientset.ServerVersion()
response := map[string]string{}

// if err then connection is lost
if err != nil {
response["error"] = err.Error()
w.WriteHeader(http.StatusInternalServerError)
disconnected <- err
// indicate that the reconnection handler should have been triggered by the disconnected channel
recovering = true
} else if recovering {
// if errors are resolved, send a reconnected message
response["reconnected"] = versionInfo.String()
recovering = false
} else {
response["success"] = versionInfo.String()
w.WriteHeader(http.StatusOK)
}

data, err := json.Marshal(response)
if err != nil {
http.Error(w, fmt.Sprintf("data: Error: %v\n\n", err), http.StatusInternalServerError)
return
}

// Write the data to the response
fmt.Fprintf(w, "data: %s\n\n", data)

// Flush the response to ensure it is sent to the client
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
}

// Check the cluster immediately
checkCluster()

for {
select {
case <-ticker.C:
checkCluster()

case <-r.Context().Done():
// Client closed the connection
return
}
}
}
}
7 changes: 1 addition & 6 deletions pkg/api/resources/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,7 @@ type Cache struct {
MetricsChanges chan struct{}
}

func NewCache(ctx context.Context) (*Cache, error) {
k8s, err := k8s.NewClient()
if err != nil {
return nil, fmt.Errorf("unable to connect to the cluster: %v", err)
}

func NewCache(ctx context.Context, k8s *k8s.Clients) (*Cache, error) {
c := &Cache{
factory: informers.NewSharedInformerFactory(k8s.Clientset, time.Minute*10),
stopper: make(chan struct{}),
Expand Down
Loading

0 comments on commit d121bdc

Please sign in to comment.