Skip to content

Commit

Permalink
Refactor WaitForControlledPodsRunning
Browse files Browse the repository at this point in the history
  • Loading branch information
mborsz committed Aug 5, 2022
1 parent d575f0f commit 0213bea
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 63 deletions.
9 changes: 4 additions & 5 deletions clusterloader2/pkg/execservice/exec_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package execservice

import (
"bytes"
"context"
"fmt"
"math/rand"
"os/exec"
Expand Down Expand Up @@ -92,10 +93,8 @@ func SetUpExecService(f *framework.Framework, c config.ExecServiceConfig) error
return fmt.Errorf("pod %s creation error: %v", execDeploymentName, err)
}

stopCh := make(chan struct{})
time.AfterFunc(execPodCheckTimeout, func() {
close(stopCh)
})
ctx, cancel := context.WithTimeout(context.TODO(), execPodCheckTimeout)
defer cancel()
selector := &measurementutil.ObjectSelector{
Namespace: execDeploymentNamespace,
LabelSelector: execPodSelector,
Expand All @@ -110,7 +109,7 @@ func SetUpExecService(f *framework.Framework, c config.ExecServiceConfig) error
if err != nil {
return fmt.Errorf("pod store creation error: %v", err)
}
if err = measurementutil.WaitForPods(podStore, stopCh, options); err != nil {
if err = measurementutil.WaitForPods(ctx, podStore, options); err != nil {
return err
}
klog.V(2).Infof("%v: service set up successfully!", execServiceName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,8 @@ func (npm *networkPerformanceMeasurement) createAndWaitForWorkerPods() error {
return fmt.Errorf("failed to create worked pods: %v ", err)
}
// Wait for all worker pods to be ready
stopCh := make(chan struct{})
time.AfterFunc(podReadyTimeout, func() {
close(stopCh)
})
ctx, cancel := context.WithTimeout(context.TODO(), podReadyTimeout)
defer cancel()
selector := &measurementutil.ObjectSelector{Namespace: netperfNamespace}
options := &measurementutil.WaitForPodOptions{
DesiredPodCount: func() int { return npm.numberOfClients + npm.numberOfServers },
Expand All @@ -240,7 +238,7 @@ func (npm *networkPerformanceMeasurement) createAndWaitForWorkerPods() error {
if err != nil {
return err
}
return measurementutil.WaitForPods(podStore, stopCh, options)
return measurementutil.WaitForPods(ctx, podStore, options)
}

func (*networkPerformanceMeasurement) String() string {
Expand Down
84 changes: 41 additions & 43 deletions clusterloader2/pkg/measurement/common/wait_for_controlled_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,20 +434,20 @@ func (w *waitForControlledPodsRunningMeasurement) handleObjectLocked(oldObj, new
if err != nil {
return fmt.Errorf("meta key creation error: %v", err)
}
checker, err := w.waitForRuntimeObject(handledObj, isObjDeleted)
if err != nil {
return fmt.Errorf("waiting for %v error: %v", key, err)
}

operationTimeout := w.operationTimeout
if isObjDeleted || isScalingDown {
// In case of deleting pods, twice as much time is required.
// The pod deletion throughput equals half of the pod creation throughput.
// NOTE: Starting from k8s 1.23 it's not true anymore, at least not in all cases.
// TODO(mborsz): Can we remove this?
operationTimeout *= 2
}
time.AfterFunc(operationTimeout, func() {
checker.terminate(true)
})

checker, err := w.waitForRuntimeObject(handledObj, isObjDeleted, operationTimeout)
if err != nil {
return fmt.Errorf("waiting for %v error: %v", key, err)
}
w.checkerMap.Add(key, checker)
return nil
}
Expand Down Expand Up @@ -543,7 +543,9 @@ func (w *waitForControlledPodsRunningMeasurement) getObjectKeysAndMaxVersion() (
return objectKeys, maxResourceVersion, nil
}

func (w *waitForControlledPodsRunningMeasurement) waitForRuntimeObject(obj runtime.Object, isDeleted bool) (*objectChecker, error) {
func (w *waitForControlledPodsRunningMeasurement) waitForRuntimeObject(obj runtime.Object, isDeleted bool, operationTimeout time.Duration) (*objectChecker, error) {
ctx := context.TODO()

runtimeObjectReplicas, err := runtimeobjects.GetReplicasFromRuntimeObject(w.clusterFramework.GetClientSets().GetClient(), obj)
if err != nil {
return nil, err
Expand Down Expand Up @@ -572,11 +574,14 @@ func (w *waitForControlledPodsRunningMeasurement) waitForRuntimeObject(obj runti
o.lock.Lock()
defer o.lock.Unlock()
w.handlingGroup.Start(func() {
// We cannot use o.stopCh for runtimeObjectReplicas.Start as it's not clear if it's closed on happy path (no errors, no timeout).
// TODO(mborsz): Migrate to o.stopCh.
stopCh := make(chan struct{})
defer close(stopCh)
if err := runtimeObjectReplicas.Start(stopCh); err != nil {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
o.SetCancel(cancel)
if operationTimeout != time.Duration(0) {
ctx, cancel = context.WithTimeout(ctx, operationTimeout)
defer cancel()
}
if err := runtimeObjectReplicas.Start(ctx.Done()); err != nil {
klog.Errorf("%s: error while starting runtimeObjectReplicas: %v", key, err)
o.err = fmt.Errorf("failed to start runtimeObjectReplicas: %v", err)
return
Expand All @@ -591,24 +596,24 @@ func (w *waitForControlledPodsRunningMeasurement) waitForRuntimeObject(obj runti

// This function sets the status (and error message) for the object checker.
// The handling of bad statuses and errors is done by gather() function of the measurement.
err = measurementutil.WaitForPods(podStore, o.stopCh, options)
err := measurementutil.WaitForPods(ctx, podStore, options)
o.lock.Lock()
defer o.lock.Unlock()
if err != nil {
if o.isRunning {
// Log error only if checker wasn't terminated.
klog.Errorf("%s: error for %v: %v", w, key, err)
o.err = fmt.Errorf("%s: %v", key, err)
}
if o.status == timeout {
klog.Errorf("%s: error for %v: %v", w, key, err)
o.err = fmt.Errorf("%s: %v", key, err)

hasTimedOut := ctx.Err() != nil
if hasTimedOut {
if isDeleted {
o.status = deleteTimeout
} else {
o.status = timeout
}
klog.Errorf("%s: %s timed out", w, key)
}
return
}
o.isRunning = false
if isDeleted {
o.status = deleted
return
Expand All @@ -630,43 +635,36 @@ const (
)

type objectChecker struct {
lock sync.Mutex
isRunning bool
stopCh chan struct{}
status objectStatus
err error
lock sync.Mutex
status objectStatus
err error
// key of the object being checked. In the current implementation it's a namespaced name, but it
// may change in the future.
key string
key string
cancel context.CancelFunc
}

func newObjectChecker(key string) *objectChecker {
return &objectChecker{
stopCh: make(chan struct{}),
isRunning: true,
status: unknown,
key: key,
status: unknown,
key: key,
}
}

func (o *objectChecker) Stop() {
o.terminate(false)
func (o *objectChecker) SetCancel(cancel context.CancelFunc) {
o.lock.Lock()
defer o.lock.Unlock()
o.cancel = cancel
}

func (o *objectChecker) getStatus() (objectStatus, error) {
func (o *objectChecker) Stop() {
o.lock.Lock()
defer o.lock.Unlock()
return o.status, o.err
o.cancel()
}

func (o *objectChecker) terminate(hasTimedOut bool) {
func (o *objectChecker) getStatus() (objectStatus, error) {
o.lock.Lock()
defer o.lock.Unlock()
if o.isRunning {
close(o.stopCh)
o.isRunning = false
if hasTimedOut {
o.status = timeout
}
}
return o.status, o.err
}
9 changes: 4 additions & 5 deletions clusterloader2/pkg/measurement/common/wait_for_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package common

import (
"context"
"time"

"k8s.io/klog"
Expand Down Expand Up @@ -60,10 +61,8 @@ func (w *waitForRunningPodsMeasurement) Execute(config *measurement.Config) ([]m
return nil, err
}

stopCh := make(chan struct{})
time.AfterFunc(timeout, func() {
close(stopCh)
})
ctx, cancel := context.WithTimeout(context.TODO(), timeout)
defer cancel()
options := &measurementutil.WaitForPodOptions{
DesiredPodCount: func() int { return desiredPodCount },
CallerName: w.String(),
Expand All @@ -73,7 +72,7 @@ func (w *waitForRunningPodsMeasurement) Execute(config *measurement.Config) ([]m
if err != nil {
return nil, err
}
return nil, measurementutil.WaitForPods(podStore, stopCh, options)
return nil, measurementutil.WaitForPods(ctx, podStore, options)
}

// Dispose cleans up after the measurement.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ const (
type ReplicasWatcher interface {
Replicas() int
// Start must block until Replicas() returns a correct value.
Start(stopCh chan struct{}) error
Start(stopCh <-chan struct{}) error
}

// ConstReplicas is a ReplicasWatcher implementation that returns a constant value.
Expand All @@ -62,7 +62,7 @@ func (c *ConstReplicas) Replicas() int {
return c.ReplicasCount
}

func (c *ConstReplicas) Start(_ chan struct{}) error {
func (c *ConstReplicas) Start(_ <-chan struct{}) error {
return nil
}

Expand All @@ -89,7 +89,7 @@ func NewNodeCounter(client clientset.Interface, nodeSelector labels.Selector, af
}
}

func (n *NodeCounter) Start(stopCh chan struct{}) error {
func (n *NodeCounter) Start(stopCh <-chan struct{}) error {
i := informer.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
Expand Down
5 changes: 3 additions & 2 deletions clusterloader2/pkg/measurement/util/wait_for_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package util

import (
"context"
"fmt"
"strings"
"time"
Expand Down Expand Up @@ -52,7 +53,7 @@ type PodLister interface {

// WaitForPods waits till desired number of pods is running.
// The current set of pods are fetched by calling List() on the provided PodStore.
func WaitForPods(ps PodLister, stopCh <-chan struct{}, options *WaitForPodOptions) error {
func WaitForPods(ctx context.Context, ps PodLister, options *WaitForPodOptions) error {
oldPods, err := ps.List()
if err != nil {
return fmt.Errorf("failed to list pods: %w", err)
Expand All @@ -63,7 +64,7 @@ func WaitForPods(ps PodLister, stopCh <-chan struct{}, options *WaitForPodOption

for {
select {
case <-stopCh:
case <-ctx.Done():
desiredPodCount := options.DesiredPodCount()
pods := ComputePodsStatus(oldPods)
klog.V(2).Infof("%s: %s: expected %d pods, got %d pods (not RunningAndReady pods: %v)", options.CallerName, ps.String(), desiredPodCount, len(oldPods), pods.NotRunningAndReady())
Expand Down

0 comments on commit 0213bea

Please sign in to comment.