Skip to content

Commit

Permalink
patch batch index to pods during rollout (openkruise#43)
Browse files Browse the repository at this point in the history
Signed-off-by: mingzhou.swx <mingzhou.swx@alibaba-inc.com>

Co-authored-by: mingzhou.swx <mingzhou.swx@alibaba-inc.com>
  • Loading branch information
veophi and mingzhou.swx authored Jun 10, 2022
1 parent 53d32dc commit 4bd51e0
Show file tree
Hide file tree
Showing 21 changed files with 672 additions and 136 deletions.
6 changes: 1 addition & 5 deletions api/v1alpha1/batchrelease_plan_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ type ReleasePlan struct {
// BatchPartition start from 0.
// +optional
BatchPartition *int32 `json:"batchPartition,omitempty"`
// Paused the rollout, the release progress will be paused util paused is false.
// default is false
// +optional
Paused bool `json:"paused,omitempty"`
}

// ReleaseBatch is used to describe how each batch release should be
Expand Down Expand Up @@ -82,7 +78,7 @@ type BatchReleaseStatus struct {
// newest canary Deployment.
// +optional
CollisionCount *int32 `json:"collisionCount,omitempty"`
// ObservedReleasePlanHash is a hash code of observed itself releasePlan.Batches.
// ObservedReleasePlanHash is a hash code of observed itself spec.releasePlan.
ObservedReleasePlanHash string `json:"observedReleasePlanHash,omitempty"`
// Phase is the release plan phase, which indicates the current state of release
// plan state machine in BatchRelease controller.
Expand Down
4 changes: 4 additions & 0 deletions api/v1alpha1/batchrelease_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ type BatchReleaseSpec struct {
TargetRef ObjectRef `json:"targetReference"`
// ReleasePlan is the details on how to rollout the resources
ReleasePlan ReleasePlan `json:"releasePlan"`
// Paused the rollout, the release progress will be paused util paused is false.
// default is false
// +optional
Paused bool `json:"paused,omitempty"`
}

type DeploymentReleaseStrategyType string
Expand Down
10 changes: 5 additions & 5 deletions config/crd/bases/rollouts.kruise.io_batchreleases.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ spec:
description: BatchReleaseSpec defines how to describe an update between
different compRevision
properties:
paused:
description: Paused the rollout, the release progress will be paused
util paused is false. default is false
type: boolean
releasePlan:
description: ReleasePlan is the details on how to rollout the resources
properties:
Expand Down Expand Up @@ -93,10 +97,6 @@ spec:
- canaryReplicas
type: object
type: array
paused:
description: Paused the rollout, the release progress will be
paused util paused is false. default is false
type: boolean
type: object
targetReference:
description: TargetRef contains the GVK and name of the workload that
Expand Down Expand Up @@ -211,7 +211,7 @@ spec:
type: integer
observedReleasePlanHash:
description: ObservedReleasePlanHash is a hash code of observed itself
releasePlan.Batches.
spec.releasePlan.
type: string
observedWorkloadReplicas:
description: ObservedWorkloadReplicas is observed replicas of target
Expand Down
4 changes: 2 additions & 2 deletions config/manager/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ spec:
resources:
limits:
cpu: 100m
memory: 30Mi
memory: 100Mi
requests:
cpu: 100m
memory: 20Mi
memory: 100Mi
serviceAccountName: controller-manager
terminationGracePeriodSeconds: 10
18 changes: 13 additions & 5 deletions pkg/controller/batchrelease/batchrelease_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,22 @@ type podEventHandler struct {
}

func (p podEventHandler) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
pod, ok := evt.Object.(*corev1.Pod)
if !ok {
return
}
p.enqueue(pod, q)
}
func (p podEventHandler) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) {
}
func (p podEventHandler) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
}
func (p podEventHandler) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
oldPod := evt.ObjectOld.(*corev1.Pod)
newPod := evt.ObjectNew.(*corev1.Pod)
oldPod, oldOK := evt.ObjectOld.(*corev1.Pod)
newPod, newOK := evt.ObjectNew.(*corev1.Pod)
if !oldOK || !newOK {
return
}
if oldPod.ResourceVersion == newPod.ResourceVersion || util.IsPodReady(oldPod) == util.IsPodReady(newPod) {
return
}
Expand All @@ -79,9 +87,9 @@ func (p podEventHandler) enqueue(pod *corev1.Pod, q workqueue.RateLimitingInterf
Name: owner.Name, Namespace: pod.Namespace,
}
workloadGVK := schema.FromAPIVersionAndKind(owner.APIVersion, owner.Kind)
workloadObj := util.GetEmptyWorkloadObject(workloadGVK)
err := p.Get(context.TODO(), workloadNamespacedName, workloadObj)
if err != nil {
workloadObj, err := util.GetOwnerWorkload(p.Reader, pod)
if err != nil || workloadObj == nil {
klog.Errorf("Failed to get owner workload for pod %v, err: %v", client.ObjectKeyFromObject(pod), err)
return
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/controller/batchrelease/batchrelease_plan_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
appsv1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1"
"github.com/openkruise/rollouts/api/v1alpha1"
"github.com/openkruise/rollouts/pkg/controller/batchrelease/workloads"
"github.com/openkruise/rollouts/pkg/util"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -261,6 +262,12 @@ func (r *Executor) GetWorkloadController() (workloads.WorkloadController, error)
}

gvk := schema.FromAPIVersionAndKind(targetRef.APIVersion, targetRef.Kind)
if !util.IsSupportedWorkload(gvk) {
message := fmt.Sprintf("the workload type '%v' is not supported", gvk)
r.recorder.Event(r.release, v1.EventTypeWarning, "UnsupportedWorkload", message)
return nil, fmt.Errorf(message)
}

targetKey := types.NamespacedName{
Namespace: r.release.Namespace,
Name: targetRef.Name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (r *Executor) checkHealthBeforeExecution(controller workloads.WorkloadContr
message = "Release plan is deleted or cancelled, then terminate"
signalTerminating(r.releaseStatus)

case isPlanPaused(workloadEvent, r.releasePlan, r.releaseStatus):
case isPlanPaused(workloadEvent, r.release, r.releaseStatus):
// handle the case that releasePlan.paused = true
reason = "PlanPaused"
message = "release plan is paused, then stop reconcile"
Expand Down Expand Up @@ -184,8 +184,8 @@ func isPlanUnhealthy(plan *v1alpha1.ReleasePlan, status *v1alpha1.BatchReleaseSt
return int(status.CanaryStatus.CurrentBatch) >= len(plan.Batches) && status.Phase == v1alpha1.RolloutPhaseProgressing
}

func isPlanPaused(event workloads.WorkloadEventType, plan *v1alpha1.ReleasePlan, status *v1alpha1.BatchReleaseStatus) bool {
return plan.Paused && status.Phase == v1alpha1.RolloutPhaseProgressing && !isWorkloadGone(event, status)
func isPlanPaused(event workloads.WorkloadEventType, release *v1alpha1.BatchRelease, status *v1alpha1.BatchReleaseStatus) bool {
return release.Spec.Paused && status.Phase == v1alpha1.RolloutPhaseProgressing && !isWorkloadGone(event, status)
}

func isGetWorkloadInfoError(err error) bool {
Expand Down
23 changes: 23 additions & 0 deletions pkg/controller/batchrelease/workloads/cloneset_control_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,12 @@ func (c *CloneSetRolloutController) UpgradeOneBatch() (bool, error) {
}
}

// patch current batch label to pods
patchDone, err := c.patchPodBatchLabel(canaryGoal)
if !patchDone || err != nil {
return false, err
}

c.recorder.Eventf(c.parentController, v1.EventTypeNormal, "SetBatchDone",
"Finished submitting all upgrade quests for batch %d", c.releaseStatus.CanaryStatus.CurrentBatch)
return true, nil
Expand Down Expand Up @@ -330,3 +336,20 @@ func (c *CloneSetRolloutController) recordCloneSetRevisionAndReplicas() {
c.releaseStatus.StableRevision = c.clone.Status.CurrentRevision
c.releaseStatus.UpdateRevision = c.clone.Status.UpdateRevision
}

func (c *CloneSetRolloutController) patchPodBatchLabel(canaryGoal int32) (bool, error) {
rolloutID, exist := c.parentController.Labels[util.RolloutIDLabel]
if !exist || rolloutID == "" {
return true, nil
}

pods, err := util.ListOwnedPods(c.client, c.clone)
if err != nil {
klog.Errorf("Failed to list pods for CloneSet %v", c.targetNamespacedName)
return false, err
}

batchID := c.parentController.Status.CanaryStatus.CurrentBatch + 1
updateRevision := c.parentController.Status.UpdateRevision
return util.PatchPodBatchLabel(c.client, pods, rolloutID, batchID, updateRevision, canaryGoal, c.releasePlanKey)
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@ func (c *DeploymentsRolloutController) UpgradeOneBatch() (bool, error) {
}
}

// patch current batch label to pods
patchDone, err := c.patchPodBatchLabel(canaryGoal)
if !patchDone || err != nil {
return false, err
}

c.recorder.Eventf(c.parentController, v1.EventTypeNormal, "Batch Rollout", "Finished submitting all upgrade quests for batch %d", c.releaseStatus.CanaryStatus.CurrentBatch)
return true, nil
}
Expand Down Expand Up @@ -364,3 +370,20 @@ func (c *DeploymentsRolloutController) recordDeploymentRevisionAndReplicas() err
c.releaseStatus.ObservedWorkloadReplicas = *c.stable.Spec.Replicas
return nil
}

func (c *DeploymentsRolloutController) patchPodBatchLabel(canaryGoal int32) (bool, error) {
rolloutID, exist := c.parentController.Labels[util.RolloutIDLabel]
if !exist || rolloutID == "" || c.canary == nil {
return true, nil
}

pods, err := util.ListOwnedPods(c.client, c.canary)
if err != nil {
klog.Errorf("Failed to list pods for Deployment %v", c.stableNamespacedName)
return false, err
}

batchID := c.parentController.Status.CanaryStatus.CurrentBatch + 1
updateRevision := c.parentController.Status.UpdateRevision
return util.PatchPodBatchLabel(c.client, pods, rolloutID, batchID, updateRevision, canaryGoal, c.releaseKey)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@ package workloads

import (
"context"
"fmt"

appsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1"
"github.com/openkruise/rollouts/pkg/util"
utilclient "github.com/openkruise/rollouts/pkg/util/client"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -75,6 +73,14 @@ func (c *StatefulSetLikeController) GetWorkloadInfo() (*util.WorkloadInfo, error

workloadInfo := util.ParseStatefulSetInfo(set, c.namespacedName)
workloadInfo.Paused = true
if workloadInfo.Status.UpdatedReadyReplicas <= 0 {
updatedReadyReplicas, err := c.countUpdatedReadyPods(workloadInfo.Status.UpdateRevision)
if err != nil {
return nil, err
}
workloadInfo.Status.UpdatedReadyReplicas = updatedReadyReplicas
}

return workloadInfo, nil
}

Expand All @@ -85,6 +91,7 @@ func (c *StatefulSetLikeController) ClaimWorkload() (bool, error) {
}

err = util.ClaimWorkload(c.Client, c.planController, set, map[string]interface{}{
"type": apps.RollingUpdateStatefulSetStrategyType,
"rollingUpdate": map[string]interface{}{
"partition": pointer.Int32(util.ParseReplicasFrom(set)),
},
Expand Down Expand Up @@ -122,9 +129,9 @@ func (c *StatefulSetLikeController) UpgradeBatch(canaryReplicasGoal, stableRepli
return false, err
}

observedReplicas := canaryReplicasGoal + stableReplicasGoal
if observedReplicas != util.ParseReplicasFrom(set) {
return false, fmt.Errorf("StatefulSet(%v) scaled, should handle scale event first", c.namespacedName)
// if no needs to patch partition
if isStatefulSetUpgradedDone(set, canaryReplicasGoal, stableReplicasGoal) {
return true, nil
}

err = util.PatchSpec(c.Client, set, map[string]interface{}{
Expand All @@ -142,20 +149,6 @@ func (c *StatefulSetLikeController) UpgradeBatch(canaryReplicasGoal, stableRepli
return true, nil
}

func (c *StatefulSetLikeController) IsBatchUpgraded(canaryReplicasGoal, stableReplicasGoal int32) (bool, error) {
set, err := c.GetWorkloadObject()
if err != nil {
return false, err
}

if !util.IsStatefulSetRollingUpdate(set) {
return false, fmt.Errorf("StatefulSet(%v) rollingUpdate configuration is nil, should check it manually", c.namespacedName)
}

partition := util.GetStatefulSetPartition(set)
return partition <= stableReplicasGoal, nil
}

func (c *StatefulSetLikeController) IsBatchReady(canaryReplicasGoal, stableReplicasGoal int32) (bool, error) {
workloadInfo, err := c.GetWorkloadInfo()
if err != nil {
Expand Down Expand Up @@ -198,49 +191,40 @@ func (c *StatefulSetLikeController) IsBatchReady(canaryReplicasGoal, stableRepli
return secondCheckPointReady(), nil
}

func (c *StatefulSetLikeController) listOwnedPods() ([]*v1.Pod, error) {
func (c *StatefulSetLikeController) ListOwnedPods() ([]*v1.Pod, error) {
if c.pods != nil {
return c.pods, nil
}
set, err := c.GetWorkloadObject()
if err != nil {
return nil, err
}
selector, err := util.ParseSelectorFrom(set)
if err != nil || selector == nil {
return nil, err
}
podLister := &v1.PodList{}
err = c.List(context.TODO(), podLister, &client.ListOptions{LabelSelector: selector}, utilclient.DisableDeepCopy)
if err != nil {
return nil, err
}
c.pods = make([]*v1.Pod, 0)
for i := range podLister.Items {
pod := &podLister.Items[i]
if !pod.DeletionTimestamp.IsZero() {
continue
}
owner := metav1.GetControllerOf(pod)
if owner == nil || owner.UID != set.GetUID() {
continue
}
c.pods = append(c.pods, pod)
}
return c.pods, nil
c.pods, err = util.ListOwnedPods(c.Client, set)
return c.pods, err
}

func (c *StatefulSetLikeController) countUpdatedReadyPods(updateRevision string) (int32, error) {
pods, err := c.listOwnedPods()
pods, err := c.ListOwnedPods()
if err != nil {
return 0, err
}
activePods := util.FilterActivePods(pods)
updatedReadyReplicas := int32(0)
for _, pod := range pods {
for _, pod := range activePods {
if util.IsConsistentWithRevision(pod, updateRevision) && util.IsPodReady(pod) {
updatedReadyReplicas++
}
}
klog.V(3).Infof("BatchRelease(%v) observed %d updatedReadyReplicas")
return updatedReadyReplicas, nil
}

func isStatefulSetUpgradedDone(set *unstructured.Unstructured, canaryReplicasGoal, stableReplicasGoal int32) bool {
partition := util.GetStatefulSetPartition(set)
if partition <= stableReplicasGoal {
return true
}
updatedReplicas := util.ParseStatusIntFrom(set, "updatedReplicas")
observedGeneration := util.ParseStatusIntFrom(set, "observedGeneration")
return set.GetGeneration() == observedGeneration && int(updatedReplicas) >= int(canaryReplicasGoal)
}
Loading

0 comments on commit 4bd51e0

Please sign in to comment.