diff --git a/ssa/go.mod b/ssa/go.mod index 1860ae7f..7e0d0162 100644 --- a/ssa/go.mod +++ b/ssa/go.mod @@ -35,7 +35,7 @@ require ( github.com/golang/protobuf v1.5.3 // indirect github.com/google/btree v1.0.1 // indirect github.com/google/gnostic v0.5.7-v3refs // indirect - github.com/google/gofuzz v1.1.0 // indirect + github.com/google/gofuzz v1.2.0 // indirect github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect github.com/google/uuid v1.3.0 // indirect github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 // indirect diff --git a/ssa/go.sum b/ssa/go.sum index 3b99b913..a9594f76 100644 --- a/ssa/go.sum +++ b/ssa/go.sum @@ -75,8 +75,8 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= -github.com/google/gofuzz v1.1.0 h1:Hsa8mG0dQ46ij8Sl2AYJDUv1oA9/d6Vk+3LG99Oe02g= -github.com/google/gofuzz v1.1.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= +github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJYCmNdQXq6neHJOYx3V6jnqNEec= github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4= github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ= diff --git a/ssa/manager_apply.go b/ssa/manager_apply.go index f5b35026..695629e3 100644 --- a/ssa/manager_apply.go +++ b/ssa/manager_apply.go @@ -210,7 +210,7 @@ func (m *ResourceManager) ApplyAllStaged(ctx context.Context, objects []*unstruc } changeSet.Append(cs.Entries) - if err := m.Wait(stageOne, WaitOptions{2 * time.Second, opts.WaitTimeout}); err != nil { + if err := m.Wait(stageOne, WaitOptions{2 * time.Second, opts.WaitTimeout, false}); err != nil { return nil, err } } diff --git a/ssa/manager_delete_test.go b/ssa/manager_delete_test.go index a3bf1ed6..b309930f 100644 --- a/ssa/manager_delete_test.go +++ b/ssa/manager_delete_test.go @@ -96,7 +96,7 @@ func TestDelete(t *testing.T) { t.Error(err) } - if err := manager.WaitForTermination(objects, WaitOptions{time.Second, 5 * time.Second}); err != nil { + if err := manager.WaitForTermination(objects, WaitOptions{time.Second, 5 * time.Second, false}); err != nil { // workaround for https://github.com/kubernetes-sigs/controller-runtime/issues/880 if !strings.Contains(err.Error(), "Namespace/") { t.Error(err) diff --git a/ssa/manager_wait.go b/ssa/manager_wait.go index cfb8ecd5..6bcc58c7 100644 --- a/ssa/manager_wait.go +++ b/ssa/manager_wait.go @@ -43,6 +43,9 @@ type WaitOptions struct { // Timeout defines after which interval should the engine give up on waiting for resources // to become ready. Timeout time.Duration + + // FailFast makes the Wait function return an error as soon as a resource reaches the failed state. + FailFast bool } // DefaultWaitOptions returns the default wait options where the poll interval is set to @@ -77,10 +80,12 @@ func (m *ResourceManager) WaitForSet(set object.ObjMetadataSet, opts WaitOptions eventsChan := m.poller.Poll(ctx, set, pollingOpts) lastStatus := make(map[object.ObjMetadata]*event.ResourceStatus) + var failedResources int done := statusCollector.ListenWithObserver(eventsChan, collector.ObserverFunc( func(statusCollector *collector.ResourceStatusCollector, e event.Event) { var rss []*event.ResourceStatus + var countFailed int for _, rs := range statusCollector.ResourceStatuses { if rs == nil { continue @@ -91,12 +96,17 @@ func (m *ResourceManager) WaitForSet(set object.ObjMetadataSet, opts WaitOptions if rs.Error != context.DeadlineExceeded { lastStatus[rs.Identifier] = rs } + + if rs.Status == status.FailedStatus { + countFailed++ + } rss = append(rss, rs) } + failedResources = countFailed desired := status.CurrentStatus aggStatus := aggregator.AggregateStatus(rss, desired) - if aggStatus == desired { + if aggStatus == desired || (opts.FailFast && countFailed > 0) { cancel() return } @@ -109,7 +119,12 @@ func (m *ResourceManager) WaitForSet(set object.ObjMetadataSet, opts WaitOptions return statusCollector.Error } - if ctx.Err() == context.DeadlineExceeded { + if ctx.Err() == context.DeadlineExceeded || (opts.FailFast && failedResources > 0) { + msg := "failed early due to stalled resources" + if ctx.Err() == context.DeadlineExceeded { + msg = "timeout waiting for" + } + var errors = []string{} for id, rs := range statusCollector.ResourceStatuses { if rs == nil { @@ -129,7 +144,7 @@ func (m *ResourceManager) WaitForSet(set object.ObjMetadataSet, opts WaitOptions errors = append(errors, builder.String()) } } - return fmt.Errorf("timeout waiting for: [%s]", strings.Join(errors, ", ")) + return fmt.Errorf("%s: [%s]", msg, strings.Join(errors, ", ")) } return nil diff --git a/ssa/manager_wait_test.go b/ssa/manager_wait_test.go index 8fbc8d8e..518abda8 100644 --- a/ssa/manager_wait_test.go +++ b/ssa/manager_wait_test.go @@ -19,11 +19,17 @@ package ssa import ( "context" + "fmt" + "strings" "testing" "time" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/cli-utils/pkg/kstatus/status" "sigs.k8s.io/cli-utils/pkg/object" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -59,7 +65,7 @@ func TestWaitForSet(t *testing.T) { t.Fatal(err) } - if err := manager.WaitForSet(changeSet.ToObjMetadataSet(), WaitOptions{time.Second, 3 * time.Second}); err == nil { + if err := manager.WaitForSet(changeSet.ToObjMetadataSet(), WaitOptions{time.Second, 3 * time.Second, false}); err == nil { t.Error("wanted wait error due to observedGeneration < generation") } @@ -95,3 +101,163 @@ func TestWaitForSet(t *testing.T) { } }) } + +func TestWaitForSet_failFast(t *testing.T) { + timeout := 5 * time.Second + interval := 2 * time.Second + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + id := generateName("failfast") + objects, err := readManifest("testdata/test10.yaml", id) + if err != nil { + t.Fatal(err) + } + + manager.SetOwnerLabels(objects, "infra", "default") + _, pvc := getFirstObject(objects, "PersistentVolumeClaim", id) + _, deploy := getFirstObject(objects, "Deployment", id) + + deployObjMeta, _ := object.RuntimeToObjMeta(deploy) + + cs, err := manager.ApplyAllStaged(ctx, objects, DefaultApplyOptions()) + if err != nil { + t.Fatal(err) + } + + var clusterDeploy = &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: id, + Namespace: id, + }, + } + err = manager.client.Get(ctx, client.ObjectKeyFromObject(deploy), clusterDeploy) + if err != nil { + t.Fatal(err) + } + + // Set Progressing Condition to false and reason to ProgressDeadlineExceeded + // This tells kstatus that the deployment has stalled. + cond := appsv1.DeploymentCondition{ + Type: appsv1.DeploymentProgressing, + Status: corev1.ConditionFalse, + LastTransitionTime: metav1.Time{}, + Reason: "ProgressDeadlineExceeded", + Message: "timeout progressing", + } + clusterDeploy.Status = appsv1.DeploymentStatus{ + ObservedGeneration: clusterDeploy.Generation, + Replicas: *clusterDeploy.Spec.Replicas, + UpdatedReplicas: *clusterDeploy.Spec.Replicas, + UnavailableReplicas: *clusterDeploy.Spec.Replicas, + Conditions: []appsv1.DeploymentCondition{cond}, + } + err = manager.client.Status().Update(ctx, clusterDeploy) + if err != nil { + t.Fatal(err) + } + + clusterPvc := &unstructured.Unstructured{} + clusterPvc.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "", + Kind: "PersistentVolumeClaim", + Version: "v1", + }) + if err := manager.client.Get(ctx, client.ObjectKeyFromObject(pvc), clusterPvc); err != nil { + t.Fatal(err) + } + + if err := unstructured.SetNestedField(clusterPvc.Object, "Bound", "status", "phase"); err != nil { + t.Fatal(err) + } + + opts := &client.SubResourcePatchOptions{ + PatchOptions: client.PatchOptions{ + FieldManager: manager.owner.Field, + }, + } + + clusterPvc.SetManagedFields(nil) + if err := manager.client.Status().Patch(ctx, clusterPvc, client.Apply, opts); err != nil { + t.Fatal(err) + } + + t.Run("timeout when failfast is set to false", func(t *testing.T) { + err = manager.WaitForSet(cs.ToObjMetadataSet(), WaitOptions{ + Interval: interval, + Timeout: timeout, + FailFast: false, + }) + + deployFailedMsg := fmt.Sprintf("%s status: '%s'", FmtObjMetadata(deployObjMeta), status.FailedStatus) + + if err == nil || !strings.Contains(err.Error(), "timeout waiting for") { + t.Fatal("expected WaitForSet to timeout waiting for deployment") + } + + if !strings.Contains(err.Error(), deployFailedMsg) { + t.Fatal("expected error to contain status of failed deployment") + } + }) + + t.Run("return early when failfast is set to true", func(t *testing.T) { + err = manager.WaitForSet(cs.ToObjMetadataSet(), WaitOptions{ + Interval: interval, + Timeout: timeout, + FailFast: true, + }) + + deployFailedMsg := fmt.Sprintf("%s status: '%s'", FmtObjMetadata(deployObjMeta), status.FailedStatus) + + if err == nil || !strings.Contains(err.Error(), "failed early") { + t.Fatal("expected WaitForSet to fail early due to stalled deployment") + } + + if !strings.Contains(err.Error(), deployFailedMsg) { + t.Fatal("expected error to contain status of failed deployment") + } + }) + + t.Run("fail early even if there are still Progressing resources", func(t *testing.T) { + // change status to Pending to have an 'InProgress' resource + clusterPvc := &unstructured.Unstructured{} + clusterPvc.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "", + Kind: "PersistentVolumeClaim", + Version: "v1", + }) + if err := manager.client.Get(ctx, client.ObjectKeyFromObject(pvc), clusterPvc); err != nil { + t.Fatal(err) + } + + if err := unstructured.SetNestedField(clusterPvc.Object, "Pending", "status", "phase"); err != nil { + t.Fatal(err) + } + opts := &client.SubResourcePatchOptions{ + PatchOptions: client.PatchOptions{ + FieldManager: manager.owner.Field, + }, + } + + clusterPvc.SetManagedFields(nil) + if err := manager.client.Status().Patch(ctx, clusterPvc, client.Apply, opts); err != nil { + t.Fatal(err) + } + + err = manager.WaitForSet(cs.ToObjMetadataSet(), WaitOptions{ + Interval: interval, + Timeout: timeout, + FailFast: true, + }) + + deployFailedMsg := fmt.Sprintf("%s status: '%s'", FmtObjMetadata(deployObjMeta), status.FailedStatus) + + if err == nil || !strings.Contains(err.Error(), "failed early") { + t.Fatal("expected WaitForSet to fail early due to stalled deployment") + } + + if !strings.Contains(err.Error(), deployFailedMsg) { + t.Fatal("expected error to contain status of failed deployment") + } + }) +} diff --git a/ssa/testdata/test10.yaml b/ssa/testdata/test10.yaml new file mode 100644 index 00000000..2aff2e9b --- /dev/null +++ b/ssa/testdata/test10.yaml @@ -0,0 +1,52 @@ +--- +apiVersion: v1 +kind: Namespace +metadata: + name: "%[1]s" +--- +apiVersion: v1 +data: + foo: bar +kind: ConfigMap +metadata: + labels: + name: "%[1]s" + namespace: "%[1]s" +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + creationTimestamp: null + labels: + app: "%[1]s" + name: "%[1]s" + namespace: "%[1]s" +spec: + replicas: 1 + progressDeadlineSeconds: 60 + selector: + matchLabels: + app: podinfo + strategy: {} + template: + metadata: + creationTimestamp: null + labels: + app: podinfo + spec: + containers: + - image: nginxx + name: nginx +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: "%[1]s" + namespace: "%[1]s" +spec: + storageClassName: manual + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 3Gi