Skip to content

Commit

Permalink
refactor(k8s): podTracker.Start takes a ctx
Browse files Browse the repository at this point in the history
  • Loading branch information
cognifloyd committed Apr 10, 2022
1 parent 7c4f885 commit 6df6625
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 7 deletions.
2 changes: 1 addition & 1 deletion runtime/kubernetes/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (c *client) AssembleBuild(ctx context.Context, b *pipeline.Build) error {
}

// Populate the PodTracker caches before creating the pipeline pod
c.PodTracker.Start(ctx.Done())
c.PodTracker.Start(ctx)

if ok := cache.WaitForCacheSync(ctx.Done(), c.PodTracker.PodSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
Expand Down
8 changes: 4 additions & 4 deletions runtime/kubernetes/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,19 +398,19 @@ func TestKubernetes_WaitContainer(t *testing.T) {

// run tests
for _, test := range tests {
// anonymous function to allow "defer close(stopCh)" on each iteration
// anonymous function to allow "defer done()" on each iteration
func() {
// set up the fake k8s clientset so that it returns the final/updated state
_engine, err := NewMock(test.updated)
if err != nil {
t.Errorf("unable to create runtime engine: %v", err)
}

stopCh := make(chan struct{})
defer close(stopCh)
ctx, done := context.WithCancel(context.Background())
defer done()

// enable the add/update/delete funcs for pod changes
_engine.PodTracker.Start(stopCh)
_engine.PodTracker.Start(ctx)

go func() {
// revert the cached pod to an "older" version
Expand Down
5 changes: 3 additions & 2 deletions runtime/kubernetes/pod_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package kubernetes

import (
"context"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -133,11 +134,11 @@ func (p podTracker) getTrackedPod(obj interface{}) *v1.Pod {

// Start kicks off the API calls to start populating the cache.
// There is no need to run this in a separate goroutine (ie go podTracker.Start(stopCh)).
func (p podTracker) Start(stopCh <-chan struct{}) {
func (p podTracker) Start(ctx context.Context) {
p.Logger.Tracef("starting PodTracker for pod %s", p.TrackedPod)

// Start method is non-blocking and runs all registered informers in a dedicated goroutine.
p.informerFactory.Start(stopCh)
p.informerFactory.Start(ctx.Done())
}

// newPodTracker initializes a podTracker with a given clientset for a given pod.
Expand Down

0 comments on commit 6df6625

Please sign in to comment.