Skip to content

Commit

Permalink
refactor(kubernetes): Refactor watch related code (#288)
Browse files Browse the repository at this point in the history
  • Loading branch information
cognifloyd authored Mar 15, 2022
1 parent eb5db88 commit 389b9a6
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 36 deletions.
9 changes: 6 additions & 3 deletions runtime/kubernetes/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
)
Expand Down Expand Up @@ -311,7 +312,7 @@ func (c *client) WaitContainer(ctx context.Context, ctn *pipeline.Container) err
c.Logger.Tracef("waiting for container %s", ctn.ID)

// create label selector for watching the pod
selector := fmt.Sprintf("pipeline=%s", c.Pod.ObjectMeta.Name)
selector := fmt.Sprintf("pipeline=%s", fields.EscapeValue(c.Pod.ObjectMeta.Name))

// create options for watching the container
opts := metav1.ListOptions{
Expand All @@ -325,16 +326,18 @@ func (c *client) WaitContainer(ctx context.Context, ctn *pipeline.Container) err
// ->
// https://pkg.go.dev/k8s.io/apimachinery/pkg/watch?tab=doc#Interface
// nolint: contextcheck // ignore non-inherited new context
watch, err := c.Kubernetes.CoreV1().Pods(c.config.Namespace).Watch(context.Background(), opts)
podWatch, err := c.Kubernetes.CoreV1().Pods(c.config.Namespace).Watch(context.Background(), opts)
if err != nil {
return err
}

defer podWatch.Stop()

for {
// capture new result from the channel
//
// https://pkg.go.dev/k8s.io/apimachinery/pkg/watch?tab=doc#Interface
result := <-watch.ResultChan()
result := <-podWatch.ResultChan()

// convert the object from the result to a pod
pod, ok := result.Object.(*v1.Pod)
Expand Down
40 changes: 7 additions & 33 deletions runtime/kubernetes/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes/fake"
testcore "k8s.io/client-go/testing"
)

func TestKubernetes_InspectContainer(t *testing.T) {
Expand Down Expand Up @@ -225,35 +222,6 @@ func TestKubernetes_TailContainer(t *testing.T) {
}

func TestKubernetes_WaitContainer(t *testing.T) {
// setup types
_engine, err := NewMock(_pod)
if err != nil {
t.Errorf("unable to create runtime engine: %v", err)
}

// create a new fake kubernetes client
//
// https://pkg.go.dev/k8s.io/client-go/kubernetes/fake?tab=doc#NewSimpleClientset
_kubernetes := fake.NewSimpleClientset(_pod)

// create a new fake watcher
//
// https://pkg.go.dev/k8s.io/apimachinery/pkg/watch?tab=doc#NewFake
_watch := watch.NewFake()

// create a new watch reactor with the fake watcher
//
// https://pkg.go.dev/k8s.io/client-go/testing?tab=doc#DefaultWatchReactor
reactor := testcore.DefaultWatchReactor(_watch, nil)

// add watch reactor to beginning of the client chain
//
// https://pkg.go.dev/k8s.io/client-go/testing?tab=doc#Fake.PrependWatchReactor
_kubernetes.PrependWatchReactor("pods", reactor)

// overwrite the mock kubernetes client
_engine.Kubernetes = _kubernetes

// setup tests
tests := []struct {
failure bool
Expand Down Expand Up @@ -314,12 +282,18 @@ func TestKubernetes_WaitContainer(t *testing.T) {

// run tests
for _, test := range tests {
// setup types
_engine, _watch, err := newMockWithWatch(_pod, "pods")
if err != nil {
t.Errorf("unable to create runtime engine: %v", err)
}

go func() {
// simulate adding a pod to the watcher
_watch.Add(test.object)
}()

err := _engine.WaitContainer(context.Background(), test.container)
err = _engine.WaitContainer(context.Background(), test.container)

if test.failure {
if err == nil {
Expand Down
39 changes: 39 additions & 0 deletions runtime/kubernetes/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import (

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes/fake"
testcore "k8s.io/client-go/testing"
)

func TestKubernetes_New(t *testing.T) {
Expand Down Expand Up @@ -314,3 +317,39 @@ var (
},
}
)

// newMockWithWatch returns an Engine implementation that
// integrates with a Kubernetes runtime and a FakeWatcher
// that can be used to inject resource events into it.
func newMockWithWatch(pod *v1.Pod, watchResource string, opts ...ClientOpt) (*client, *watch.RaceFreeFakeWatcher, error) {
// setup types
_engine, err := NewMock(pod, opts...)
if err != nil {
return nil, nil, err
}

// create a new fake kubernetes client
//
// https://pkg.go.dev/k8s.io/client-go/kubernetes/fake?tab=doc#NewSimpleClientset
_kubernetes := fake.NewSimpleClientset(pod)

// create a new fake watcher
//
// https://pkg.go.dev/k8s.io/apimachinery/pkg/watch?tab=doc#NewRaceFreeFake
_watch := watch.NewRaceFreeFake()

// create a new watch reactor with the fake watcher
//
// https://pkg.go.dev/k8s.io/client-go/testing?tab=doc#DefaultWatchReactor
reactor := testcore.DefaultWatchReactor(_watch, nil)

// add watch reactor to beginning of the client chain
//
// https://pkg.go.dev/k8s.io/client-go/testing?tab=doc#Fake.PrependWatchReactor
_kubernetes.PrependWatchReactor(watchResource, reactor)

// overwrite the mock kubernetes client
_engine.Kubernetes = _kubernetes

return _engine, _watch, nil
}

0 comments on commit 389b9a6

Please sign in to comment.