Skip to content

Commit

Permalink
feat: add sparse streams, compression, single resource api endpoints (#…
Browse files Browse the repository at this point in the history
…28)

## Description

The original SSE implementation is very inefficient as it streams the
complete / reconciled list on each data change event. After exploring a
number of different strategies for optimizing this, including
differentials, sparse data + compression realized a significant
efficiency gains, > 90% typically and in some extreme cases up to 99.96%
reduction in bandwidth requirements.

Spare Streams: This PR introduces a concept of sparse and dense resource
streams. The default is now sparse and can be overriden by adding the
query param `dense=true` to any of the resource streams. Sparse streams
only return `metadata`, `status`, `type`, `data` (keys only), `kind`,
`apiVersion` fields and strip `managedFields` from `metadata` as well.
This results in much lighter data streams.

Single-Resource Endpoint: This PR also introduces the ability to call a
dense resource by it's UID, to be used in conjunction with thew new
spare streams. The url pattern is `/api/v1/resources/<kind>/uid`, in
this mode `once` and `dense` currently have no affect, this also means
we do not yet support streaming a single resource.

Compression: GZIP compressions for both streams and regular API calls
will now automatically be enabled if the client supports it.

This PR also introduces the Secrets & Configmaps UI views, which
originally triggered the performance concerns due to large binary data
stored in their resource lists.
  • Loading branch information
jeff-mccoy authored Jul 25, 2024
1 parent 084fc83 commit c5c4c9c
Show file tree
Hide file tree
Showing 24 changed files with 608 additions and 148 deletions.
143 changes: 87 additions & 56 deletions pkg/api/resources/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,13 @@ import (

admissionRegV1 "k8s.io/api/admissionregistration/v1"
appsV1 "k8s.io/api/apps/v1"
autoscalingV2 "k8s.io/api/autoscaling/v2"
autoScalingV2 "k8s.io/api/autoscaling/v2"
batchV1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
networkingV1 "k8s.io/api/networking/v1"
coreV1 "k8s.io/api/core/v1"
nodeV1 "k8s.io/api/node/v1"
schedulingV1 "k8s.io/api/scheduling/v1"
storageV1 "k8s.io/api/storage/v1"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
dynamicInformer "k8s.io/client-go/dynamic/dynamicinformer"
Expand All @@ -35,45 +33,44 @@ type Cache struct {
factory informers.SharedInformerFactory
dynamicFactory dynamicInformer.DynamicSharedInformerFactory

RuntimeClasses *ResourceList[*nodeV1.RuntimeClass]

// Core resources
Events *ResourceList[*v1.Event]
Namespaces *ResourceList[*v1.Namespace]
Nodes *ResourceList[*v1.Node]
Events *ResourceList
Namespaces *ResourceList
Nodes *ResourceList

// Workload resources
Pods *ResourceList[*v1.Pod]
Deployments *ResourceList[*appsV1.Deployment]
Daemonsets *ResourceList[*appsV1.DaemonSet]
Statefulsets *ResourceList[*appsV1.StatefulSet]
Jobs *ResourceList[*batchV1.Job]
CronJobs *ResourceList[*batchV1.CronJob]
Pods *ResourceList
Deployments *ResourceList
Daemonsets *ResourceList
Statefulsets *ResourceList
Jobs *ResourceList
CronJobs *ResourceList

// UDS resources
UDSPackages *ResourceList[*unstructured.Unstructured]
UDSExemptions *ResourceList[*unstructured.Unstructured]
UDSPackages *ResourceList
UDSExemptions *ResourceList

// Config resources
Configmaps *ResourceList[*v1.ConfigMap]
Secrets *ResourceList[*v1.Secret]
Configmaps *ResourceList
Secrets *ResourceList

// Cluster ops resources
MutatingWebhooks *ResourceList[*admissionRegV1.MutatingWebhookConfiguration]
ValidatingWebhooks *ResourceList[*admissionRegV1.ValidatingWebhookConfiguration]
HPAs *ResourceList[*autoscalingV2.HorizontalPodAutoscaler]
PriorityClasses *ResourceList[*schedulingV1.PriorityClass]
MutatingWebhooks *ResourceList
ValidatingWebhooks *ResourceList
HPAs *ResourceList
PriorityClasses *ResourceList
RuntimeClasses *ResourceList

// Network resources
Services *ResourceList[*v1.Service]
NetworkPolicies *ResourceList[*networkingV1.NetworkPolicy]
Endpoints *ResourceList[*v1.Endpoints]
VirtualServices *ResourceList[*unstructured.Unstructured]
Services *ResourceList
NetworkPolicies *ResourceList
Endpoints *ResourceList
VirtualServices *ResourceList

// Storage resources
PersistentVolumes *ResourceList[*v1.PersistentVolume]
PersistentVolumeClaims *ResourceList[*v1.PersistentVolumeClaim]
StorageClasses *ResourceList[*storageV1.StorageClass]
PersistentVolumes *ResourceList
PersistentVolumeClaims *ResourceList
StorageClasses *ResourceList

// Metrics
PodMetrics *PodMetrics
Expand Down Expand Up @@ -130,53 +127,87 @@ func NewCache(ctx context.Context) (*Cache, error) {
}

func (c *Cache) bindCoreResources() {
c.Nodes = NewResourceList[*v1.Node](c.factory.Core().V1().Nodes().Informer())
c.Events = NewResourceList[*v1.Event](c.factory.Core().V1().Events().Informer())
c.Namespaces = NewResourceList[*v1.Namespace](c.factory.Core().V1().Namespaces().Informer())
nodeGVK := coreV1.SchemeGroupVersion.WithKind("Node")
eventGVK := coreV1.SchemeGroupVersion.WithKind("Event")
namespaceGVK := coreV1.SchemeGroupVersion.WithKind("Namespace")

c.Nodes = NewResourceList(c.factory.Core().V1().Nodes().Informer(), nodeGVK)
c.Events = NewResourceList(c.factory.Core().V1().Events().Informer(), eventGVK)
c.Namespaces = NewResourceList(c.factory.Core().V1().Namespaces().Informer(), namespaceGVK)
}

func (c *Cache) bindWorkloadResources() {
c.Pods = NewResourceList[*v1.Pod](c.factory.Core().V1().Pods().Informer())
c.Deployments = NewResourceList[*appsV1.Deployment](c.factory.Apps().V1().Deployments().Informer())
c.Daemonsets = NewResourceList[*appsV1.DaemonSet](c.factory.Apps().V1().DaemonSets().Informer())
c.Statefulsets = NewResourceList[*appsV1.StatefulSet](c.factory.Apps().V1().StatefulSets().Informer())
c.Jobs = NewResourceList[*batchV1.Job](c.factory.Batch().V1().Jobs().Informer())
c.CronJobs = NewResourceList[*batchV1.CronJob](c.factory.Batch().V1().CronJobs().Informer())
podGVK := coreV1.SchemeGroupVersion.WithKind("Pod")
deploymentGVK := appsV1.SchemeGroupVersion.WithKind("Deployment")
daemonsetGVK := appsV1.SchemeGroupVersion.WithKind("DaemonSet")
statefulsetGVK := appsV1.SchemeGroupVersion.WithKind("StatefulSet")
jobGVK := batchV1.SchemeGroupVersion.WithKind("Job")
cronJobGVK := batchV1.SchemeGroupVersion.WithKind("CronJob")

c.Pods = NewResourceList(c.factory.Core().V1().Pods().Informer(), podGVK)
c.Deployments = NewResourceList(c.factory.Apps().V1().Deployments().Informer(), deploymentGVK)
c.Daemonsets = NewResourceList(c.factory.Apps().V1().DaemonSets().Informer(), daemonsetGVK)
c.Statefulsets = NewResourceList(c.factory.Apps().V1().StatefulSets().Informer(), statefulsetGVK)
c.Jobs = NewResourceList(c.factory.Batch().V1().Jobs().Informer(), jobGVK)
c.CronJobs = NewResourceList(c.factory.Batch().V1().CronJobs().Informer(), cronJobGVK)
}

func (c *Cache) bindConfigResources() {
c.Configmaps = NewResourceList[*v1.ConfigMap](c.factory.Core().V1().ConfigMaps().Informer())
c.Secrets = NewResourceList[*v1.Secret](c.factory.Core().V1().Secrets().Informer())
configMapGVK := coreV1.SchemeGroupVersion.WithKind("ConfigMap")
secretGVK := coreV1.SchemeGroupVersion.WithKind("Secret")

c.Configmaps = NewResourceList(c.factory.Core().V1().ConfigMaps().Informer(), configMapGVK)
c.Secrets = NewResourceList(c.factory.Core().V1().Secrets().Informer(), secretGVK)
}

func (c *Cache) bindClusterOpsResources() {
c.MutatingWebhooks = NewResourceList[*admissionRegV1.MutatingWebhookConfiguration](c.factory.Admissionregistration().V1().MutatingWebhookConfigurations().Informer())
c.ValidatingWebhooks = NewResourceList[*admissionRegV1.ValidatingWebhookConfiguration](c.factory.Admissionregistration().V1().ValidatingWebhookConfigurations().Informer())
c.HPAs = NewResourceList[*autoscalingV2.HorizontalPodAutoscaler](c.factory.Autoscaling().V2().HorizontalPodAutoscalers().Informer())
c.RuntimeClasses = NewResourceList[*nodeV1.RuntimeClass](c.factory.Node().V1().RuntimeClasses().Informer())
c.PriorityClasses = NewResourceList[*schedulingV1.PriorityClass](c.factory.Scheduling().V1().PriorityClasses().Informer())
mutatingWebhookGVK := admissionRegV1.SchemeGroupVersion.WithKind("MutatingWebhookConfiguration")
validatingWebhookGVK := admissionRegV1.SchemeGroupVersion.WithKind("ValidatingWebhookConfiguration")
hpaGVK := autoScalingV2.SchemeGroupVersion.WithKind("HorizontalPodAutoscaler")
runtimeClassGVK := nodeV1.SchemeGroupVersion.WithKind("RuntimeClass")
priorityClassGVK := schedulingV1.SchemeGroupVersion.WithKind("PriorityClass")

c.MutatingWebhooks = NewResourceList(c.factory.Admissionregistration().V1().MutatingWebhookConfigurations().Informer(), mutatingWebhookGVK)
c.ValidatingWebhooks = NewResourceList(c.factory.Admissionregistration().V1().ValidatingWebhookConfigurations().Informer(), validatingWebhookGVK)
c.HPAs = NewResourceList(c.factory.Autoscaling().V2().HorizontalPodAutoscalers().Informer(), hpaGVK)
c.RuntimeClasses = NewResourceList(c.factory.Node().V1().RuntimeClasses().Informer(), runtimeClassGVK)
c.PriorityClasses = NewResourceList(c.factory.Scheduling().V1().PriorityClasses().Informer(), priorityClassGVK)
}

func (c *Cache) bindNetworkResources() {
c.Services = NewResourceList[*v1.Service](c.factory.Core().V1().Services().Informer())
c.NetworkPolicies = NewResourceList[*networkingV1.NetworkPolicy](c.factory.Networking().V1().NetworkPolicies().Informer())
c.Endpoints = NewResourceList[*v1.Endpoints](c.factory.Core().V1().Endpoints().Informer())
serviceGVK := coreV1.SchemeGroupVersion.WithKind("Service")
networkPolicyGVK := coreV1.SchemeGroupVersion.WithKind("NetworkPolicy")
endpointGVK := coreV1.SchemeGroupVersion.WithKind("Endpoints")
isitoVSGVK := schema.FromAPIVersionAndKind("networking.istio.io/v1", "VirtualService")

c.Services = NewResourceList(c.factory.Core().V1().Services().Informer(), serviceGVK)
c.NetworkPolicies = NewResourceList(c.factory.Networking().V1().NetworkPolicies().Informer(), networkPolicyGVK)
c.Endpoints = NewResourceList(c.factory.Core().V1().Endpoints().Informer(), endpointGVK)

// VirtualServices are not part of the core informer factory
c.VirtualServices = NewResourceList[*unstructured.Unstructured](c.dynamicFactory.ForResource(schema.GroupVersionResource{
istioVSGVR := schema.GroupVersionResource{
Group: "networking.istio.io",
Version: "v1",
Resource: "virtualservices",
}).Informer())
}

c.VirtualServices = NewResourceList(c.dynamicFactory.ForResource(istioVSGVR).Informer(), isitoVSGVK)
}

func (c *Cache) bindStorageResources() {
c.PersistentVolumes = NewResourceList[*v1.PersistentVolume](c.factory.Core().V1().PersistentVolumes().Informer())
c.PersistentVolumeClaims = NewResourceList[*v1.PersistentVolumeClaim](c.factory.Core().V1().PersistentVolumeClaims().Informer())
c.StorageClasses = NewResourceList[*storageV1.StorageClass](c.factory.Storage().V1().StorageClasses().Informer())
persistentVolumeGVK := coreV1.SchemeGroupVersion.WithKind("PersistentVolume")
persistentVolumeClaimGVK := coreV1.SchemeGroupVersion.WithKind("PersistentVolumeClaim")
storageClassGVK := storageV1.SchemeGroupVersion.WithKind("StorageClass")

c.PersistentVolumes = NewResourceList(c.factory.Core().V1().PersistentVolumes().Informer(), persistentVolumeGVK)
c.PersistentVolumeClaims = NewResourceList(c.factory.Core().V1().PersistentVolumeClaims().Informer(), persistentVolumeClaimGVK)
c.StorageClasses = NewResourceList(c.factory.Storage().V1().StorageClasses().Informer(), storageClassGVK)
}

func (c *Cache) bindUDSResources() {
udsPackageGVK := schema.FromAPIVersionAndKind("uds.dev/v1alpha1", "Package")
udsExemptionGVK := schema.FromAPIVersionAndKind("uds.dev/v1alpha1", "Exemption")

udsPackageGVR := schema.GroupVersionResource{
Group: "uds.dev",
Version: "v1alpha1",
Expand All @@ -189,6 +220,6 @@ func (c *Cache) bindUDSResources() {
Resource: "exemptions",
}

c.UDSPackages = NewResourceList[*unstructured.Unstructured](c.dynamicFactory.ForResource(udsPackageGVR).Informer())
c.UDSExemptions = NewResourceList[*unstructured.Unstructured](c.dynamicFactory.ForResource(udsExemptionsGVR).Informer())
c.UDSPackages = NewResourceList(c.dynamicFactory.ForResource(udsPackageGVR).Informer(), udsPackageGVK)
c.UDSExemptions = NewResourceList(c.dynamicFactory.ForResource(udsExemptionsGVR).Informer(), udsExemptionGVK)
}
Loading

0 comments on commit c5c4c9c

Please sign in to comment.