Skip to content

Commit

Permalink
Merge pull request #127 from KevFan/limits-hash-pod-annotation
Browse files Browse the repository at this point in the history
feat: immediate limits sync to pod via annotation
  • Loading branch information
KevFan authored Apr 8, 2024
2 parents cca7a77 + f0b39d5 commit c4802c5
Show file tree
Hide file tree
Showing 23 changed files with 975 additions and 875 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ jobs:
- name: Create k8s Kind Cluster
uses: helm/kind-action@v1.2.0
with:
version: v0.20.0
version: v0.22.0
config: utils/kind-cluster.yaml
cluster_name: limitador-local
wait: 120s
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ opm: $(OPM) ## Download opm locally if necessary.

KIND = $(PROJECT_PATH)/bin/kind
$(KIND):
$(call go-install-tool,$(KIND),sigs.k8s.io/kind@v0.20.0)
$(call go-install-tool,$(KIND),sigs.k8s.io/kind@v0.22.0)

.PHONY: kind
kind: $(KIND) ## Download kind locally if necessary.
Expand Down
11 changes: 11 additions & 0 deletions api/v1alpha1/limitador_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ import (
const (
DefaultServiceHTTPPort int32 = 8080
DefaultServiceGRPCPort int32 = 8081
DefaultReplicas int32 = 1

PodAnnotationConfigMapResourceVersion string = "limits-cm-resource-version"

// Status conditions
StatusConditionReady string = "Ready"
Expand Down Expand Up @@ -147,6 +150,14 @@ func (l *Limitador) GetResourceRequirements() *corev1.ResourceRequirements {
return l.Spec.ResourceRequirements
}

func (l *Limitador) GetReplicas() int32 {
if l.Spec.Replicas == nil {
return DefaultReplicas
}

return int32(*l.Spec.Replicas)
}

//+kubebuilder:object:root=true

// LimitadorList contains a list of Limitador
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ spec:
- list
- update
- watch
- apiGroups:
- ""
resources:
- pods
verbs:
- list
- update
- watch
- apiGroups:
- apps
resources:
Expand Down
8 changes: 8 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ rules:
- list
- update
- watch
- apiGroups:
- ""
resources:
- pods
verbs:
- list
- update
- watch
- apiGroups:
- apps
resources:
Expand Down
57 changes: 54 additions & 3 deletions controllers/limitador_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,17 @@ import (
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/yaml"

limitadorv1alpha1 "github.com/kuadrant/limitador-operator/api/v1alpha1"
"github.com/kuadrant/limitador-operator/pkg/limitador"
"github.com/kuadrant/limitador-operator/pkg/reconcilers"
upgrades "github.com/kuadrant/limitador-operator/pkg/upgrades"
"github.com/kuadrant/limitador-operator/pkg/upgrades"
)

// LimitadorReconciler reconciles a Limitador object
Expand All @@ -48,6 +50,7 @@ type LimitadorReconciler struct {
//+kubebuilder:rbac:groups=policy,resources=poddisruptionbudgets,verbs=get;list;watch;create;update;delete
//+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;delete
//+kubebuilder:rbac:groups="",resources=services;configmaps;secrets;persistentvolumeclaims,verbs=get;list;watch;create;update;delete
//+kubebuilder:rbac:groups="",resources=pods,verbs=list;watch;update

func (r *LimitadorReconciler) Reconcile(eventCtx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := r.Logger().WithValues("limitador", req.NamespacedName)
Expand All @@ -57,7 +60,7 @@ func (r *LimitadorReconciler) Reconcile(eventCtx context.Context, req ctrl.Reque
// Delete Limitador deployment and service if needed
limitadorObj := &limitadorv1alpha1.Limitador{}
if err := r.Client().Get(ctx, req.NamespacedName, limitadorObj); err != nil {
if errors.IsNotFound(err) {
if apierrors.IsNotFound(err) {
logger.Info("no object found")
return ctrl.Result{}, nil
}
Expand Down Expand Up @@ -129,6 +132,54 @@ func (r *LimitadorReconciler) reconcileSpec(ctx context.Context, limitadorObj *l
if err := r.reconcilePdb(ctx, limitadorObj); err != nil {
return ctrl.Result{}, err
}

return r.reconcilePodLimitsHashAnnotation(ctx, limitadorObj)
}

func (r *LimitadorReconciler) reconcilePodLimitsHashAnnotation(ctx context.Context, limitadorObj *limitadorv1alpha1.Limitador) (ctrl.Result, error) {
podList := &v1.PodList{}
options := &client.ListOptions{
LabelSelector: labels.SelectorFromSet(limitador.Labels(limitadorObj)),
Namespace: limitadorObj.Namespace,
}
if err := r.Client().List(ctx, podList, options); err != nil {
return ctrl.Result{}, err
}

if len(podList.Items) == 0 {
return ctrl.Result{Requeue: true}, nil
}

// Replicas won't change if spec.Replicas goes from value to nil
if limitadorObj.Spec.Replicas != nil && len(podList.Items) != int(limitadorObj.GetReplicas()) {
return ctrl.Result{Requeue: true}, nil
}

// Use CM resource version to track limits changes
cm := &v1.ConfigMap{}
if err := r.Client().Get(ctx, types.NamespacedName{Name: limitador.LimitsConfigMapName(limitadorObj), Namespace: limitadorObj.Namespace}, cm); err != nil {
if apierrors.IsNotFound(err) {
return ctrl.Result{Requeue: true}, nil
}
return ctrl.Result{}, err
}

for idx := range podList.Items {
pod := podList.Items[idx]
annotations := pod.GetAnnotations()
if annotations == nil {
annotations = make(map[string]string)
}
// Update only if there is a change in resource version value
if annotations[limitadorv1alpha1.PodAnnotationConfigMapResourceVersion] != cm.ResourceVersion {
annotations[limitadorv1alpha1.PodAnnotationConfigMapResourceVersion] = cm.ResourceVersion
pod.SetAnnotations(annotations)
if err := r.Client().Update(ctx, &pod); err != nil {
return ctrl.Result{}, err
}
}
}

return ctrl.Result{}, nil
}

Expand Down
92 changes: 41 additions & 51 deletions controllers/limitador_controller_affinity_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package controllers

import (
"context"
"reflect"
"time"

. "github.com/onsi/ginkgo/v2"
Expand All @@ -17,14 +15,19 @@ import (
)

var _ = Describe("Limitador controller manages affinity", func() {

const (
nodeTimeOut = NodeTimeout(time.Second * 30)
specTimeOut = SpecTimeout(time.Minute * 2)
)
var testNamespace string

BeforeEach(func() {
CreateNamespace(&testNamespace)
})
BeforeEach(func(ctx SpecContext) {
CreateNamespaceWithContext(ctx, &testNamespace)
}, nodeTimeOut)

AfterEach(DeleteNamespaceCallback(&testNamespace))
AfterEach(func(ctx SpecContext) {
DeleteNamespaceWithContext(ctx, &testNamespace)
}, nodeTimeOut)

Context("Creating a new Limitador object with specific affinity", func() {
var limitadorObj *limitadorv1alpha1.Limitador
Expand All @@ -47,29 +50,27 @@ var _ = Describe("Limitador controller manages affinity", func() {
},
}

BeforeEach(func() {
BeforeEach(func(ctx SpecContext) {
limitadorObj = basicLimitador(testNamespace)
limitadorObj.Spec.Affinity = affinity
Expect(k8sClient.Create(context.TODO(), limitadorObj)).Should(Succeed())
Eventually(testLimitadorIsReady(limitadorObj), time.Minute, 5*time.Second).Should(BeTrue())
})
Expect(k8sClient.Create(ctx, limitadorObj)).Should(Succeed())
Eventually(testLimitadorIsReady(ctx, limitadorObj)).WithContext(ctx).Should(Succeed())
}, nodeTimeOut)

It("Should create a new deployment with the custom affinity", func() {
It("Should create a new deployment with the custom affinity", func(ctx SpecContext) {
deployment := appsv1.Deployment{}
Eventually(func() bool {
err := k8sClient.Get(
context.TODO(),
Eventually(func(g Gomega) {
g.Expect(k8sClient.Get(
ctx,
types.NamespacedName{
Namespace: testNamespace,
Name: limitador.DeploymentName(limitadorObj),
},
&deployment)

return err == nil
}, timeout, interval).Should(BeTrue())
&deployment)).To(Succeed())
}).WithContext(ctx).Should(Succeed())

Expect(deployment.Spec.Template.Spec.Affinity).To(Equal(affinity))
})
}, specTimeOut)
})

Context("Updating limitador object with new affinity settings", func() {
Expand All @@ -93,54 +94,43 @@ var _ = Describe("Limitador controller manages affinity", func() {
},
}

BeforeEach(func() {
BeforeEach(func(ctx SpecContext) {
limitadorObj = basicLimitador(testNamespace)
Expect(k8sClient.Create(context.TODO(), limitadorObj)).Should(Succeed())
Eventually(testLimitadorIsReady(limitadorObj), time.Minute, 5*time.Second).Should(BeTrue())
})
Expect(k8sClient.Create(ctx, limitadorObj)).Should(Succeed())
Eventually(testLimitadorIsReady(ctx, limitadorObj)).WithContext(ctx).Should(Succeed())
}, nodeTimeOut)

It("Should modify the deployment with the affinity custom settings", func() {
It("Should modify the deployment with the affinity custom settings", func(ctx SpecContext) {
deployment := appsv1.Deployment{}
Eventually(func() bool {
err := k8sClient.Get(context.TODO(), types.NamespacedName{
Eventually(func(g Gomega) {
g.Expect(k8sClient.Get(ctx, types.NamespacedName{
Namespace: testNamespace,
Name: limitador.DeploymentName(limitadorObj),
}, &deployment)

return err == nil
}, timeout, interval).Should(BeTrue())
}, &deployment)).To(Succeed())
}).WithContext(ctx).Should(Succeed())

Expect(deployment.Spec.Template.Spec.Affinity).To(BeNil())

updatedLimitador := limitadorv1alpha1.Limitador{}
Eventually(func() bool {
err := k8sClient.Get(context.TODO(), types.NamespacedName{
Eventually(func(g Gomega) {
g.Expect(k8sClient.Get(ctx, types.NamespacedName{
Namespace: testNamespace,
Name: limitadorObj.Name,
}, &updatedLimitador)

if err != nil {
return false
}
}, &updatedLimitador)).Should(Succeed())

updatedLimitador.Spec.Affinity = affinity.DeepCopy()

return k8sClient.Update(context.TODO(), &updatedLimitador) == nil
}, timeout, interval).Should(BeTrue())
g.Expect(k8sClient.Update(ctx, &updatedLimitador)).Should(Succeed())
}).WithContext(ctx).Should(Succeed())

Eventually(func() bool {
Eventually(func(g Gomega) {
newDeployment := appsv1.Deployment{}
err := k8sClient.Get(context.TODO(), types.NamespacedName{
g.Expect(k8sClient.Get(ctx, types.NamespacedName{
Namespace: testNamespace,
Name: limitador.DeploymentName(limitadorObj),
}, &newDeployment)

if err != nil {
return false
}

return reflect.DeepEqual(newDeployment.Spec.Template.Spec.Affinity, affinity)
}, timeout, interval).Should(BeTrue())
})
}, &newDeployment)).To(Succeed())
g.Expect(newDeployment.Spec.Template.Spec.Affinity).Should(Equal(affinity))
}).WithContext(ctx).Should(Succeed())
}, specTimeOut)
})
})
Loading

0 comments on commit c4802c5

Please sign in to comment.