Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add failfast option to ssa.Wait #609

Merged
merged 1 commit into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ssa/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ require (
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/btree v1.0.1 // indirect
github.com/google/gnostic v0.5.7-v3refs // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 // indirect
Expand Down
4 changes: 2 additions & 2 deletions ssa/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/gofuzz v1.1.0 h1:Hsa8mG0dQ46ij8Sl2AYJDUv1oA9/d6Vk+3LG99Oe02g=
github.com/google/gofuzz v1.1.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0=
github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJYCmNdQXq6neHJOYx3V6jnqNEec=
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4=
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ=
Expand Down
2 changes: 1 addition & 1 deletion ssa/manager_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (m *ResourceManager) ApplyAllStaged(ctx context.Context, objects []*unstruc
}
changeSet.Append(cs.Entries)

if err := m.Wait(stageOne, WaitOptions{2 * time.Second, opts.WaitTimeout}); err != nil {
if err := m.Wait(stageOne, WaitOptions{2 * time.Second, opts.WaitTimeout, false}); err != nil {
return nil, err
}
}
Expand Down
2 changes: 1 addition & 1 deletion ssa/manager_delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestDelete(t *testing.T) {
t.Error(err)
}

if err := manager.WaitForTermination(objects, WaitOptions{time.Second, 5 * time.Second}); err != nil {
if err := manager.WaitForTermination(objects, WaitOptions{time.Second, 5 * time.Second, false}); err != nil {
// workaround for https://github.com/kubernetes-sigs/controller-runtime/issues/880
if !strings.Contains(err.Error(), "Namespace/") {
t.Error(err)
Expand Down
21 changes: 18 additions & 3 deletions ssa/manager_wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type WaitOptions struct {
// Timeout defines after which interval should the engine give up on waiting for resources
// to become ready.
Timeout time.Duration

// FailFast makes the Wait function return an error as soon as a resource reaches the failed state.
FailFast bool
}

// DefaultWaitOptions returns the default wait options where the poll interval is set to
Expand Down Expand Up @@ -77,10 +80,12 @@ func (m *ResourceManager) WaitForSet(set object.ObjMetadataSet, opts WaitOptions
eventsChan := m.poller.Poll(ctx, set, pollingOpts)

lastStatus := make(map[object.ObjMetadata]*event.ResourceStatus)
var failedResources int

done := statusCollector.ListenWithObserver(eventsChan, collector.ObserverFunc(
func(statusCollector *collector.ResourceStatusCollector, e event.Event) {
var rss []*event.ResourceStatus
var countFailed int
for _, rs := range statusCollector.ResourceStatuses {
if rs == nil {
continue
Expand All @@ -91,12 +96,17 @@ func (m *ResourceManager) WaitForSet(set object.ObjMetadataSet, opts WaitOptions
if rs.Error != context.DeadlineExceeded {
lastStatus[rs.Identifier] = rs
}

if rs.Status == status.FailedStatus {
countFailed++
}
rss = append(rss, rs)
}
failedResources = countFailed

desired := status.CurrentStatus
aggStatus := aggregator.AggregateStatus(rss, desired)
if aggStatus == desired {
if aggStatus == desired || (opts.FailFast && countFailed > 0) {
cancel()
return
}
Expand All @@ -109,7 +119,12 @@ func (m *ResourceManager) WaitForSet(set object.ObjMetadataSet, opts WaitOptions
return statusCollector.Error
}

if ctx.Err() == context.DeadlineExceeded {
if ctx.Err() == context.DeadlineExceeded || (opts.FailFast && failedResources > 0) {
msg := "failed early due to stalled resources"
if ctx.Err() == context.DeadlineExceeded {
msg = "timeout waiting for"
}

var errors = []string{}
for id, rs := range statusCollector.ResourceStatuses {
if rs == nil {
Expand All @@ -129,7 +144,7 @@ func (m *ResourceManager) WaitForSet(set object.ObjMetadataSet, opts WaitOptions
errors = append(errors, builder.String())
}
}
return fmt.Errorf("timeout waiting for: [%s]", strings.Join(errors, ", "))
return fmt.Errorf("%s: [%s]", msg, strings.Join(errors, ", "))
stefanprodan marked this conversation as resolved.
Show resolved Hide resolved
}

return nil
Expand Down
168 changes: 167 additions & 1 deletion ssa/manager_wait_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,17 @@ package ssa

import (
"context"
"fmt"
"strings"
"testing"
"time"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/object"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand Down Expand Up @@ -59,7 +65,7 @@ func TestWaitForSet(t *testing.T) {
t.Fatal(err)
}

if err := manager.WaitForSet(changeSet.ToObjMetadataSet(), WaitOptions{time.Second, 3 * time.Second}); err == nil {
if err := manager.WaitForSet(changeSet.ToObjMetadataSet(), WaitOptions{time.Second, 3 * time.Second, false}); err == nil {
t.Error("wanted wait error due to observedGeneration < generation")
}

Expand Down Expand Up @@ -95,3 +101,163 @@ func TestWaitForSet(t *testing.T) {
}
})
}

func TestWaitForSet_failFast(t *testing.T) {
timeout := 5 * time.Second
interval := 2 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()

id := generateName("failfast")
objects, err := readManifest("testdata/test10.yaml", id)
if err != nil {
t.Fatal(err)
}

manager.SetOwnerLabels(objects, "infra", "default")
_, pvc := getFirstObject(objects, "PersistentVolumeClaim", id)
_, deploy := getFirstObject(objects, "Deployment", id)

deployObjMeta, _ := object.RuntimeToObjMeta(deploy)

cs, err := manager.ApplyAllStaged(ctx, objects, DefaultApplyOptions())
if err != nil {
t.Fatal(err)
}

var clusterDeploy = &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: id,
Namespace: id,
},
}
err = manager.client.Get(ctx, client.ObjectKeyFromObject(deploy), clusterDeploy)
if err != nil {
t.Fatal(err)
}

// Set Progressing Condition to false and reason to ProgressDeadlineExceeded
// This tells kstatus that the deployment has stalled.
cond := appsv1.DeploymentCondition{
Type: appsv1.DeploymentProgressing,
Status: corev1.ConditionFalse,
LastTransitionTime: metav1.Time{},
Reason: "ProgressDeadlineExceeded",
Message: "timeout progressing",
}
clusterDeploy.Status = appsv1.DeploymentStatus{
ObservedGeneration: clusterDeploy.Generation,
Replicas: *clusterDeploy.Spec.Replicas,
UpdatedReplicas: *clusterDeploy.Spec.Replicas,
UnavailableReplicas: *clusterDeploy.Spec.Replicas,
Conditions: []appsv1.DeploymentCondition{cond},
}
err = manager.client.Status().Update(ctx, clusterDeploy)
if err != nil {
t.Fatal(err)
}

clusterPvc := &unstructured.Unstructured{}
clusterPvc.SetGroupVersionKind(schema.GroupVersionKind{
Group: "",
Kind: "PersistentVolumeClaim",
Version: "v1",
})
if err := manager.client.Get(ctx, client.ObjectKeyFromObject(pvc), clusterPvc); err != nil {
t.Fatal(err)
}

if err := unstructured.SetNestedField(clusterPvc.Object, "Bound", "status", "phase"); err != nil {
t.Fatal(err)
}

opts := &client.SubResourcePatchOptions{
PatchOptions: client.PatchOptions{
FieldManager: manager.owner.Field,
},
}

clusterPvc.SetManagedFields(nil)
if err := manager.client.Status().Patch(ctx, clusterPvc, client.Apply, opts); err != nil {
t.Fatal(err)
}

t.Run("timeout when failfast is set to false", func(t *testing.T) {
err = manager.WaitForSet(cs.ToObjMetadataSet(), WaitOptions{
Interval: interval,
Timeout: timeout,
FailFast: false,
})

deployFailedMsg := fmt.Sprintf("%s status: '%s'", FmtObjMetadata(deployObjMeta), status.FailedStatus)

if err == nil || !strings.Contains(err.Error(), "timeout waiting for") {
t.Fatal("expected WaitForSet to timeout waiting for deployment")
}

if !strings.Contains(err.Error(), deployFailedMsg) {
t.Fatal("expected error to contain status of failed deployment")
}
})

t.Run("return early when failfast is set to true", func(t *testing.T) {
err = manager.WaitForSet(cs.ToObjMetadataSet(), WaitOptions{
Interval: interval,
Timeout: timeout,
FailFast: true,
})

deployFailedMsg := fmt.Sprintf("%s status: '%s'", FmtObjMetadata(deployObjMeta), status.FailedStatus)

if err == nil || !strings.Contains(err.Error(), "failed early") {
t.Fatal("expected WaitForSet to fail early due to stalled deployment")
}

if !strings.Contains(err.Error(), deployFailedMsg) {
t.Fatal("expected error to contain status of failed deployment")
}
})

t.Run("fail early even if there are still Progressing resources", func(t *testing.T) {
// change status to Pending to have an 'InProgress' resource
clusterPvc := &unstructured.Unstructured{}
clusterPvc.SetGroupVersionKind(schema.GroupVersionKind{
Group: "",
Kind: "PersistentVolumeClaim",
Version: "v1",
})
if err := manager.client.Get(ctx, client.ObjectKeyFromObject(pvc), clusterPvc); err != nil {
t.Fatal(err)
}

if err := unstructured.SetNestedField(clusterPvc.Object, "Pending", "status", "phase"); err != nil {
t.Fatal(err)
}
opts := &client.SubResourcePatchOptions{
PatchOptions: client.PatchOptions{
FieldManager: manager.owner.Field,
},
}

clusterPvc.SetManagedFields(nil)
if err := manager.client.Status().Patch(ctx, clusterPvc, client.Apply, opts); err != nil {
t.Fatal(err)
}

err = manager.WaitForSet(cs.ToObjMetadataSet(), WaitOptions{
Interval: interval,
Timeout: timeout,
FailFast: true,
})

deployFailedMsg := fmt.Sprintf("%s status: '%s'", FmtObjMetadata(deployObjMeta), status.FailedStatus)

if err == nil || !strings.Contains(err.Error(), "failed early") {
t.Fatal("expected WaitForSet to fail early due to stalled deployment")
}

if !strings.Contains(err.Error(), deployFailedMsg) {
t.Fatal("expected error to contain status of failed deployment")
}
})
}
52 changes: 52 additions & 0 deletions ssa/testdata/test10.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
---
apiVersion: v1
kind: Namespace
metadata:
name: "%[1]s"
---
apiVersion: v1
data:
foo: bar
kind: ConfigMap
metadata:
labels:
name: "%[1]s"
namespace: "%[1]s"
---
apiVersion: apps/v1
kind: Deployment
metadata:
creationTimestamp: null
labels:
app: "%[1]s"
name: "%[1]s"
namespace: "%[1]s"
spec:
replicas: 1
progressDeadlineSeconds: 60
selector:
matchLabels:
app: podinfo
strategy: {}
template:
metadata:
creationTimestamp: null
labels:
app: podinfo
spec:
containers:
- image: nginxx
name: nginx
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: "%[1]s"
namespace: "%[1]s"
spec:
storageClassName: manual
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 3Gi
Loading